|
Akka/Scala example source code file (FsmSimpleRedelivery.scala)
The FsmSimpleRedelivery.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package sample.redelivery import akka.actor._ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import java.util.UUID object SimpleOrderedRedeliverer { /** * Props for creating a [[SimpleOrderedRedeliverer]]. */ def props(retryTimeout: FiniteDuration) = Props(classOf[SimpleOrderedRedeliverer], retryTimeout) /* * Messages exchanged with the requester of the delivery. */ case class Deliver(to: ActorRef, msg: Any, uuid: UUID) case class Delivered(uuid: UUID) case class AcceptedForDelivery(uuid: UUID) case class Busy(refused: UUID, currentlyProcessing: UUID) /* * Messages exchanged with the “deliveree”. */ case class Ackable(from: ActorRef, msg: Any, uuid: UUID) case class Ack(uuid: UUID) /* * Various states the [[SimpleOrderedRedeliverer]] can be in. */ sealed trait State case object Idle extends State case object AwaitingAck extends State sealed trait Data case object NoData extends Data /** * Keeps track of our last delivery request. */ case class LastRequest(last: Deliver, requester: ActorRef) extends Data /** * Private message used only inside of the [[SimpleOrderedRedeliverer]] to signalize a tick of its retry timer. */ private case object Retry } /** * An actor-in-the-middle kind. Takes care of message redelivery between two or more sides. * * Works “sequentially”, thus is able to process only one message at a time: * * <pre> * Delivery-request#1 -> ACK#1 -> Delivery-request#2 -> ACK#2 -> ... * </pre> * * A situation like this: * * <pre> * Delivery-request#1 -> Delivery-request#2 -> ... * </pre> * * ... will result in the second requester getting a [[SimpleOrderedRedeliverer.Busy]] message with [[UUID]]s * of both his request and currently-processed one. * * @param retryTimeout how long to wait for the [[SimpleOrderedRedeliverer.Ack]] message */ class SimpleOrderedRedeliverer(retryTimeout: FiniteDuration) extends Actor with FSM[SimpleOrderedRedeliverer.State, SimpleOrderedRedeliverer.Data] { import SimpleOrderedRedeliverer._ // So that we don't make a typo when referencing this timer. val RetryTimer = "retry" // Start idle with neither last request, nor most recent requester. startWith(Idle, NoData) /** * Will process the provided request, sending an [[Ackable]] to its recipient and resetting the inner timer. * @return a new post-processing state. */ def process(request: Deliver, requester: ActorRef): State = { request.to ! Ackable(requester, request.msg, request.uuid) setTimer(RetryTimer, Retry, retryTimeout, repeat = false) goto(AwaitingAck) using LastRequest(request, requester) } /* * When [[Idle]], accept new requests and process them, replying with [[WillTry]]. */ when(Idle) { case Event(request: Deliver, _) => process(request, sender()) replying AcceptedForDelivery(request.uuid) } when(AwaitingAck) { /* * When awaiting the [[Ack]] and receiver seems not to have made it, * resend the message wrapped in [[Ackable]]. This time, however, without * sending [[WillTry]] to our requester! */ case Event(Retry, LastRequest(request, requester)) => process(request, requester) /* * Fortunately, the receiver made it! It his is an [[Ack]] of correct [[UUID]], * cancel the retry timer, notify original requester with [[Delivered]] message, * and turn [[Idle]] again. */ case Event(Ack(uuid), LastRequest(request, requester)) if uuid == request.uuid => cancelTimer(RetryTimer) requester ! Delivered(uuid) goto(Idle) using NoData /* * Someone (possibly else!) is trying to make the [[SimpleOrderedRedeliverer]] deliver a new message, * while an [[Ack]] for the last one has not yet been delivered. Reject. */ case Event(request: Deliver, LastRequest(current, _)) => stay() replying Busy(request.uuid, current.uuid) } } object Receiver { /** * Props for creating a [[Receiver]]. */ def props = Props(classOf[Receiver]) } class Receiver extends Actor { /** * Simulate loosing 75% of all messages on the receiving end. We want to see the redelivery in action! */ def shouldSendAck = ThreadLocalRandom.current.nextDouble() < 0.25 def receive = { case SimpleOrderedRedeliverer.Ackable(from, msg, uuid) => val goingToSendAck = shouldSendAck println(s""" [Receiver] got "$msg"; ${if (goingToSendAck) "" else " ***NOT***"} going to send Ack this time""") // Send a [[SimpleOrderedRedeliverer.Ack]] -- if they're lucky! if (goingToSendAck) sender() ! SimpleOrderedRedeliverer.Ack(uuid) } } object Requester { /** * Props for creating a [[Requester]]. */ def props = Props(classOf[Requester]) /** * Requester-private message used to drive the simulation. */ private case object Tick } class Requester extends Actor { import Requester._ import context.dispatcher /* * Create a [[SimpleOrderedRedeliverer]] and a [[Receiver]]. */ val redeliverer = context.actorOf(SimpleOrderedRedeliverer.props(retryTimeout = 3.seconds)) val receiver = context.actorOf(Receiver.props) /* * One message would be quite boring, let's pick a random of the three! */ val messages = List("Hello!", "Ping!", "Howdy!") /* * Start ticking! */ self ! Tick /** * Make a new request every anywhere-between-1-and-10 seconds. */ def nextTickIn: FiniteDuration = (1.0 + ThreadLocalRandom.current.nextDouble() * 9.0).seconds def receive = { case Tick => val msg = util.Random.shuffle(messages).head val uuid = UUID.randomUUID() println(s"""[Requester] requesting ("$msg", $uuid) to be sent to [Receiver]...""") /* * Make the actual request... */ redeliverer ! SimpleOrderedRedeliverer.Deliver(receiver, msg, uuid) /* * ... and schedule a new [[Tick]]. */ context.system.scheduler.scheduleOnce(nextTickIn, self, Tick) /* * This case is used for displaying [[SimpleOrderedRedeliverer.WillTry]] and [[SimpleOrderedRedeliverer.Delivered]] * and [[SimpleOrderedRedeliverer.Busy]] messages. */ case msg => println(s"[Requester] got $msg") } } object FsmSimpleRedelivery extends App { val system = ActorSystem() /* * Start a new [[Requester]] actor. */ system.actorOf(Requester.props) } Other Akka source code examplesHere is a short list of links related to this Akka FsmSimpleRedelivery.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.