|
Akka/Scala example source code file (StatsSample.scala)
The StatsSample.scala Akka example source code
package sample.cluster.stats
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.RelativeActorPath
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus
object StatsSample {
def main(args: Array[String]): Unit = {
if (args.isEmpty) {
startup(Seq("2551", "2552", "0"))
StatsSampleClient.main(Array.empty)
} else {
startup(args)
}
}
def startup(ports: Seq[String]): Unit = {
ports foreach { port =>
// Override the configuration of the port when specified as program argument
val config =
ConfigFactory.parseString(s"akka.remote.netty.tcp.port=" + port).withFallback(
ConfigFactory.parseString("akka.cluster.roles = [compute]")).
withFallback(ConfigFactory.load("stats1"))
val system = ActorSystem("ClusterSystem", config)
system.actorOf(Props[StatsWorker], name = "statsWorker")
system.actorOf(Props[StatsService], name = "statsService")
}
}
}
object StatsSampleClient {
def main(args: Array[String]): Unit = {
// note that client is not a compute node, role not defined
val system = ActorSystem("ClusterSystem")
system.actorOf(Props(classOf[StatsSampleClient], "/user/statsService"), "client")
}
}
class StatsSampleClient(servicePath: String) extends Actor {
val cluster = Cluster(context.system)
val servicePathElements = servicePath match {
case RelativeActorPath(elements) => elements
case _ => throw new IllegalArgumentException(
"servicePath [%s] is not a valid relative actor path" format servicePath)
}
import context.dispatcher
val tickTask = context.system.scheduler.schedule(2.seconds, 2.seconds, self, "tick")
var nodes = Set.empty[Address]
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
tickTask.cancel()
}
def receive = {
case "tick" if nodes.nonEmpty =>
// just pick any one
val address = nodes.toIndexedSeq(ThreadLocalRandom.current.nextInt(nodes.size))
val service = context.actorSelection(RootActorPath(address) / servicePathElements)
service ! StatsJob("this is the text that will be analyzed")
case result: StatsResult =>
println(result)
case failed: JobFailed =>
println(failed)
case state: CurrentClusterState =>
nodes = state.members.collect {
case m if m.hasRole("compute") && m.status == MemberStatus.Up => m.address
}
case MemberUp(m) if m.hasRole("compute") => nodes += m.address
case other: MemberEvent => nodes -= other.member.address
case UnreachableMember(m) => nodes -= m.address
case ReachableMember(m) if m.hasRole("compute") => nodes += m.address
}
}
Other Akka source code examplesHere is a short list of links related to this Akka StatsSample.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.