|
Akka/Scala example source code file (ClusterActorRefProvider.scala)
The ClusterActorRefProvider.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.Config
import akka.ConfigurationException
import akka.actor.ActorSystem
import akka.actor.ActorSystemImpl
import akka.actor.Deploy
import akka.actor.DynamicAccess
import akka.actor.InternalActorRef
import akka.actor.NoScopeGiven
import akka.actor.Scheduler
import akka.actor.Scope
import akka.actor.Terminated
import akka.dispatch.sysmsg.DeathWatchNotification
import akka.event.EventStream
import akka.japi.Util.immutableSeq
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteDeployer
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RouterConfig
import akka.routing.DefaultResizer
import akka.cluster.routing.MixMetricsSelector
import akka.cluster.routing.HeapMetricsSelector
import akka.cluster.routing.SystemLoadAverageMetricsSelector
import akka.cluster.routing.CpuMetricsSelector
import akka.cluster.routing.MetricsSelector
import akka.dispatch.sysmsg.SystemMessage
import akka.actor.ActorRef
import akka.actor.Props
import akka.routing.Pool
import akka.routing.Group
import akka.cluster.routing.ClusterRouterPool
import akka.cluster.routing.ClusterRouterGroup
import com.typesafe.config.ConfigFactory
import akka.cluster.routing.ClusterRouterPoolSettings
import akka.cluster.routing.ClusterRouterGroupSettings
/**
* INTERNAL API
*
* The `ClusterActorRefProvider` will load the [[akka.cluster.Cluster]]
* extension, i.e. the cluster will automatically be started when
* the `ClusterActorRefProvider` is used.
*/
private[akka] class ClusterActorRefProvider(
_systemName: String,
_settings: ActorSystem.Settings,
_eventStream: EventStream,
_dynamicAccess: DynamicAccess) extends RemoteActorRefProvider(
_systemName, _settings, _eventStream, _dynamicAccess) {
override def init(system: ActorSystemImpl): Unit = {
super.init(system)
// initialize/load the Cluster extension
Cluster(system)
}
override protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = {
// make sure Cluster extension is initialized/loaded from init thread
Cluster(system)
import remoteSettings._
val failureDetector = createRemoteWatcherFailureDetector(system)
system.systemActorOf(ClusterRemoteWatcher.props(
failureDetector,
heartbeatInterval = WatchHeartBeatInterval,
unreachableReaperInterval = WatchUnreachableReaperInterval,
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
}
/**
* Factory method to make it possible to override deployer in subclass
* Creates a new instance every time
*/
override protected def createDeployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
}
/**
* INTERNAL API
*
* Deployer of cluster aware routers.
*/
private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends RemoteDeployer(_settings, _pm) {
override def parseConfig(path: String, config: Config): Option[Deploy] = {
super.parseConfig(path, config) match {
case d @ Some(deploy) ⇒
if (deploy.config.getBoolean("cluster.enabled")) {
if (deploy.scope != NoScopeGiven)
throw new ConfigurationException("Cluster deployment can't be combined with scope [%s]".format(deploy.scope))
if (deploy.routerConfig.isInstanceOf[RemoteRouterConfig])
throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig))
deploy.routerConfig match {
case r: Pool ⇒
Some(deploy.copy(
routerConfig = ClusterRouterPool(r, ClusterRouterPoolSettings.fromConfig(deploy.config)), scope = ClusterScope))
case r: Group ⇒
Some(deploy.copy(
routerConfig = ClusterRouterGroup(r, ClusterRouterGroupSettings.fromConfig(deploy.config)), scope = ClusterScope))
case other ⇒
throw new IllegalArgumentException(s"Cluster aware router can only wrap Pool or Group, got [${other.getClass.getName}]")
}
} else d
case None ⇒ None
}
}
}
@SerialVersionUID(1L)
abstract class ClusterScope extends Scope
/**
* Cluster aware scope of a [[akka.actor.Deploy]]
*/
case object ClusterScope extends ClusterScope {
/**
* Java API: get the singleton instance
*/
def getInstance = this
def withFallback(other: Scope): Scope = this
}
Other Akka source code examplesHere is a short list of links related to this Akka ClusterActorRefProvider.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.