|
Akka/Scala example source code file (StatsSample.scala)
The StatsSample.scala Akka example source codepackage sample.cluster.stats import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Address import akka.actor.PoisonPill import akka.actor.Props import akka.actor.RelativeActorPath import akka.actor.RootActorPath import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus object StatsSample { def main(args: Array[String]): Unit = { if (args.isEmpty) { startup(Seq("2551", "2552", "0")) StatsSampleClient.main(Array.empty) } else { startup(args) } } def startup(ports: Seq[String]): Unit = { ports foreach { port => // Override the configuration of the port when specified as program argument val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=" + port).withFallback( ConfigFactory.parseString("akka.cluster.roles = [compute]")). withFallback(ConfigFactory.load("stats1")) val system = ActorSystem("ClusterSystem", config) system.actorOf(Props[StatsWorker], name = "statsWorker") system.actorOf(Props[StatsService], name = "statsService") } } } object StatsSampleClient { def main(args: Array[String]): Unit = { // note that client is not a compute node, role not defined val system = ActorSystem("ClusterSystem") system.actorOf(Props(classOf[StatsSampleClient], "/user/statsService"), "client") } } class StatsSampleClient(servicePath: String) extends Actor { val cluster = Cluster(context.system) val servicePathElements = servicePath match { case RelativeActorPath(elements) => elements case _ => throw new IllegalArgumentException( "servicePath [%s] is not a valid relative actor path" format servicePath) } import context.dispatcher val tickTask = context.system.scheduler.schedule(2.seconds, 2.seconds, self, "tick") var nodes = Set.empty[Address] override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent]) } override def postStop(): Unit = { cluster.unsubscribe(self) tickTask.cancel() } def receive = { case "tick" if nodes.nonEmpty => // just pick any one val address = nodes.toIndexedSeq(ThreadLocalRandom.current.nextInt(nodes.size)) val service = context.actorSelection(RootActorPath(address) / servicePathElements) service ! StatsJob("this is the text that will be analyzed") case result: StatsResult => println(result) case failed: JobFailed => println(failed) case state: CurrentClusterState => nodes = state.members.collect { case m if m.hasRole("compute") && m.status == MemberStatus.Up => m.address } case MemberUp(m) if m.hasRole("compute") => nodes += m.address case other: MemberEvent => nodes -= other.member.address case UnreachableMember(m) => nodes -= m.address case ReachableMember(m) if m.hasRole("compute") => nodes += m.address } } Other Akka source code examplesHere is a short list of links related to this Akka StatsSample.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.