|
Akka/Scala example source code file (FaultHandlingDocSample.scala)
The FaultHandlingDocSample.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package docs.actor import language.postfixOps //#all //#imports import akka.actor._ import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ import akka.util.Timeout import akka.event.LoggingReceive import akka.pattern.{ ask, pipe } import com.typesafe.config.ConfigFactory //#imports /** * Runs the sample */ object FaultHandlingDocSample extends App { import Worker._ val config = ConfigFactory.parseString(""" akka.loglevel = "DEBUG" akka.actor.debug { receive = on lifecycle = on } """) val system = ActorSystem("FaultToleranceSample", config) val worker = system.actorOf(Props[Worker], name = "worker") val listener = system.actorOf(Props[Listener], name = "listener") // start the work and listen on progress // note that the listener is used as sender of the tell, // i.e. it will receive replies from the worker worker.tell(Start, sender = listener) } /** * Listens on progress from the worker and shuts down the system when enough * work has been done. */ class Listener extends Actor with ActorLogging { import Worker._ // If we don't get any progress within 15 seconds then the service is unavailable context.setReceiveTimeout(15 seconds) def receive = { case Progress(percent) => log.info("Current progress: {} %", percent) if (percent >= 100.0) { log.info("That's all, shutting down") context.system.shutdown() } case ReceiveTimeout => // No progress within 15 seconds, ServiceUnavailable log.error("Shutting down due to unavailable service") context.system.shutdown() } } //#messages object Worker { case object Start case object Do final case class Progress(percent: Double) } //#messages /** * Worker performs some work when it receives the `Start` message. * It will continuously notify the sender of the `Start` message * of current ``Progress``. The `Worker` supervise the `CounterService`. */ class Worker extends Actor with ActorLogging { import Worker._ import CounterService._ implicit val askTimeout = Timeout(5 seconds) // Stop the CounterService child if it throws ServiceUnavailable override val supervisorStrategy = OneForOneStrategy() { case _: CounterService.ServiceUnavailable => Stop } // The sender of the initial Start message will continuously be notified // about progress var progressListener: Option[ActorRef] = None val counterService = context.actorOf(Props[CounterService], name = "counter") val totalCount = 51 import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext def receive = LoggingReceive { case Start if progressListener.isEmpty => progressListener = Some(sender()) context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do) case Do => counterService ! Increment(1) counterService ! Increment(1) counterService ! Increment(1) // Send current progress to the initial sender counterService ? GetCurrentCount map { case CurrentCount(_, count) => Progress(100.0 * count / totalCount) } pipeTo progressListener.get } } //#messages object CounterService { final case class Increment(n: Int) case object GetCurrentCount final case class CurrentCount(key: String, count: Long) class ServiceUnavailable(msg: String) extends RuntimeException(msg) private case object Reconnect } //#messages /** * Adds the value received in `Increment` message to a persistent * counter. Replies with `CurrentCount` when it is asked for `CurrentCount`. * `CounterService` supervise `Storage` and `Counter`. */ class CounterService extends Actor { import CounterService._ import Counter._ import Storage._ // Restart the storage child when StorageException is thrown. // After 3 restarts within 5 seconds it will be stopped. override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) { case _: Storage.StorageException => Restart } val key = self.path.name var storage: Option[ActorRef] = None var counter: Option[ActorRef] = None var backlog = IndexedSeq.empty[(ActorRef, Any)] val MaxBacklog = 10000 import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext override def preStart() { initStorage() } /** * The child storage is restarted in case of failure, but after 3 restarts, * and still failing it will be stopped. Better to back-off than continuously * failing. When it has been stopped we will schedule a Reconnect after a delay. * Watch the child so we receive Terminated message when it has been terminated. */ def initStorage() { storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage"))) // Tell the counter, if any, to use the new storage counter foreach { _ ! UseStorage(storage) } // We need the initial value to be able to operate storage.get ! Get(key) } def receive = LoggingReceive { case Entry(k, v) if k == key && counter == None => // Reply from Storage of the initial value, now we can create the Counter val c = context.actorOf(Props(classOf[Counter], key, v)) counter = Some(c) // Tell the counter to use current storage c ! UseStorage(storage) // and send the buffered backlog to the counter for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo) backlog = IndexedSeq.empty case msg @ Increment(n) => forwardOrPlaceInBacklog(msg) case msg @ GetCurrentCount => forwardOrPlaceInBacklog(msg) case Terminated(actorRef) if Some(actorRef) == storage => // After 3 restarts the storage child is stopped. // We receive Terminated because we watch the child, see initStorage. storage = None // Tell the counter that there is no storage for the moment counter foreach { _ ! UseStorage(None) } // Try to re-establish storage after while context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect) case Reconnect => // Re-establish storage after the scheduled delay initStorage() } def forwardOrPlaceInBacklog(msg: Any) { // We need the initial value from storage before we can start delegate to // the counter. Before that we place the messages in a backlog, to be sent // to the counter when it is initialized. counter match { case Some(c) => c forward msg case None => if (backlog.size >= MaxBacklog) throw new ServiceUnavailable( "CounterService not available, lack of initial value") backlog :+= (sender() -> msg) } } } //#messages object Counter { final case class UseStorage(storage: Option[ActorRef]) } //#messages /** * The in memory count variable that will send current * value to the `Storage`, if there is any storage * available at the moment. */ class Counter(key: String, initialValue: Long) extends Actor { import Counter._ import CounterService._ import Storage._ var count = initialValue var storage: Option[ActorRef] = None def receive = LoggingReceive { case UseStorage(s) => storage = s storeCount() case Increment(n) => count += n storeCount() case GetCurrentCount => sender() ! CurrentCount(key, count) } def storeCount() { // Delegate dangerous work, to protect our valuable state. // We can continue without storage. storage foreach { _ ! Store(Entry(key, count)) } } } //#messages object Storage { final case class Store(entry: Entry) final case class Get(key: String) final case class Entry(key: String, value: Long) class StorageException(msg: String) extends RuntimeException(msg) } //#messages /** * Saves key/value pairs to persistent storage when receiving `Store` message. * Replies with current value when receiving `Get` message. * Will throw StorageException if the underlying data store is out of order. */ class Storage extends Actor { import Storage._ val db = DummyDB def receive = LoggingReceive { case Store(Entry(key, count)) => db.save(key, count) case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L)) } } //#dummydb object DummyDB { import Storage.StorageException private var db = Map[String, Long]() @throws(classOf[StorageException]) def save(key: String, value: Long): Unit = synchronized { if (11 <= value && value <= 14) throw new StorageException("Simulated store failure " + value) db += (key -> value) } @throws(classOf[StorageException]) def load(key: String): Option[Long] = synchronized { db.get(key) } } //#dummydb //#all Other Akka source code examplesHere is a short list of links related to this Akka FaultHandlingDocSample.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.