|
Akka/Scala example source code file (SupervisedAsk.java)
The SupervisedAsk.java Akka example source codepackage docs.pattern; import java.util.concurrent.TimeoutException; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import akka.actor.Actor; import akka.actor.ActorKilledException; import akka.actor.ActorRef; import akka.actor.ActorRefFactory; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.Scheduler; import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategy.Directive; import akka.actor.Terminated; import akka.actor.UntypedActor; import akka.japi.Function; import akka.pattern.Patterns; import akka.util.Timeout; public class SupervisedAsk { private static class AskParam { Props props; Object message; Timeout timeout; AskParam(Props props, Object message, Timeout timeout) { this.props = props; this.message = message; this.timeout = timeout; } } private static class AskTimeout { } public static class AskSupervisorCreator extends UntypedActor { @Override public void onReceive(Object message) throws Exception { if (message instanceof AskParam) { ActorRef supervisor = getContext().actorOf( Props.create(AskSupervisor.class)); supervisor.forward(message, getContext()); } else { unhandled(message); } } } public static class AskSupervisor extends UntypedActor { private ActorRef targetActor; private ActorRef caller; private AskParam askParam; private Cancellable timeoutMessage; @Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(0, Duration.Zero(), new Function<Throwable, Directive>() { public Directive apply(Throwable cause) { caller.tell(new Status.Failure(cause), self()); return SupervisorStrategy.stop(); } }); } @Override public void onReceive(Object message) throws Exception { if (message instanceof AskParam) { askParam = (AskParam) message; caller = getSender(); targetActor = getContext().actorOf(askParam.props); getContext().watch(targetActor); targetActor.forward(askParam.message, getContext()); Scheduler scheduler = getContext().system().scheduler(); timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(), self(), new AskTimeout(), context().dispatcher(), null); } else if (message instanceof Terminated) { Throwable ex = new ActorKilledException("Target actor terminated."); caller.tell(new Status.Failure(ex), self()); timeoutMessage.cancel(); getContext().stop(self()); } else if (message instanceof AskTimeout) { Throwable ex = new TimeoutException("Target actor timed out after " + askParam.timeout.toString()); caller.tell(new Status.Failure(ex), self()); getContext().stop(self()); } else unhandled(message); } } public static Future<Object> askOf(ActorRef supervisorCreator, Props props, Object message, Timeout timeout) { AskParam param = new AskParam(props, message, timeout); return Patterns.ask(supervisorCreator, param, timeout); } synchronized public static ActorRef createSupervisorCreator( ActorRefFactory factory) { return factory.actorOf(Props.create(AskSupervisorCreator.class)); } } Other Akka source code examplesHere is a short list of links related to this Akka SupervisedAsk.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.