|
Akka/Scala example source code file (StatsService.scala)
The StatsService.scala Akka example source codepackage sample.cluster.stats import scala.concurrent.duration._ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props import akka.actor.ReceiveTimeout import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope import akka.routing.FromConfig //#service class StatsService extends Actor { // This router is used both with lookup and deploy of routees. If you // have a router with only lookup of routees you can use Props.empty // instead of Props[StatsWorker.class]. val workerRouter = context.actorOf(FromConfig.props(Props[StatsWorker]), name = "workerRouter") def receive = { case StatsJob(text) if text != "" => val words = text.split(" ") val replyTo = sender() // important to not close over sender() // create actor that collects replies from workers val aggregator = context.actorOf(Props( classOf[StatsAggregator], words.size, replyTo)) words foreach { word => workerRouter.tell( ConsistentHashableEnvelope(word, word), aggregator) } } } class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor { var results = IndexedSeq.empty[Int] context.setReceiveTimeout(3.seconds) def receive = { case wordCount: Int => results = results :+ wordCount if (results.size == expectedResults) { val meanWordLength = results.sum.toDouble / results.size replyTo ! StatsResult(meanWordLength) context.stop(self) } case ReceiveTimeout => replyTo ! JobFailed("Service unavailable, try again later") context.stop(self) } } //#service Other Akka source code examplesHere is a short list of links related to this Akka StatsService.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.