|
Akka/Scala example source code file (AckedDeliverySpec.scala)
The AckedDeliverySpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.remote import akka.testkit.AkkaSpec import scala.annotation.tailrec import scala.concurrent.forkjoin.ThreadLocalRandom object AckedDeliverySpec { final case class Sequenced(seq: SeqNo, body: String) extends HasSequenceNumber { override def toString = s"MSG[${seq.rawValue}]" } } class AckedDeliverySpec extends AkkaSpec { import AckedDeliverySpec._ def msg(seq: Long) = Sequenced(SeqNo(seq), "msg" + seq) "SeqNo" must { "implement simple ordering" in { val sm1 = SeqNo(-1) val s0 = SeqNo(0) val s1 = SeqNo(1) val s2 = SeqNo(2) val s0b = SeqNo(0) sm1 < s0 should be(true) sm1 > s0 should be(false) s0 < s1 should be(true) s0 > s1 should be(false) s1 < s2 should be(true) s1 > s2 should be(false) s0b == s0 should be(true) } "correctly handle wrapping over" in { val s1 = SeqNo(Long.MaxValue - 1) val s2 = SeqNo(Long.MaxValue) val s3 = SeqNo(Long.MinValue) val s4 = SeqNo(Long.MinValue + 1) s1 < s2 should be(true) s1 > s2 should be(false) s2 < s3 should be(true) s2 > s3 should be(false) s3 < s4 should be(true) s3 > s4 should be(false) } "correctly handle large gaps" in { val smin = SeqNo(Long.MinValue) val smin2 = SeqNo(Long.MinValue + 1) val s0 = SeqNo(0) s0 < smin should be(true) s0 > smin should be(false) smin2 < s0 should be(true) smin2 > s0 should be(false) } } "SendBuffer" must { "aggregate unacked messages in order" in { val b0 = new AckedSendBuffer[Sequenced](10) val msg0 = msg(0) val msg1 = msg(1) val msg2 = msg(2) val b1 = b0.buffer(msg0) b1.nonAcked should be(Vector(msg0)) val b2 = b1.buffer(msg1) b2.nonAcked should be(Vector(msg0, msg1)) val b3 = b2.buffer(msg2) b3.nonAcked should be(Vector(msg0, msg1, msg2)) } "refuse buffering new messages if capacity reached" in { val buffer = new AckedSendBuffer[Sequenced](4).buffer(msg(0)).buffer(msg(1)).buffer(msg(2)).buffer(msg(3)) intercept[ResendBufferCapacityReachedException] { buffer buffer msg(4) } } "remove messages from buffer when cumulative ack received" in { val b0 = new AckedSendBuffer[Sequenced](10) val msg0 = msg(0) val msg1 = msg(1) val msg2 = msg(2) val msg3 = msg(3) val msg4 = msg(4) val b1 = b0.buffer(msg0) b1.nonAcked should be(Vector(msg0)) val b2 = b1.buffer(msg1) b2.nonAcked should be(Vector(msg0, msg1)) val b3 = b2.buffer(msg2) b3.nonAcked should be(Vector(msg0, msg1, msg2)) val b4 = b3.acknowledge(Ack(SeqNo(1))) b4.nonAcked should be(Vector(msg2)) val b5 = b4.buffer(msg3) b5.nonAcked should be(Vector(msg2, msg3)) val b6 = b5.buffer(msg4) b6.nonAcked should be(Vector(msg2, msg3, msg4)) val b7 = b6.acknowledge(Ack(SeqNo(1))) b7.nonAcked should be(Vector(msg2, msg3, msg4)) val b8 = b7.acknowledge(Ack(SeqNo(2))) b8.nonAcked should be(Vector(msg3, msg4)) val b9 = b8.acknowledge(Ack(SeqNo(5))) b9.nonAcked should be(Vector.empty) } "keep NACKed messages in buffer if selective nacks are received" in { val b0 = new AckedSendBuffer[Sequenced](10) val msg0 = msg(0) val msg1 = msg(1) val msg2 = msg(2) val msg3 = msg(3) val msg4 = msg(4) val b1 = b0.buffer(msg0) b1.nonAcked should be(Vector(msg0)) val b2 = b1.buffer(msg1) b2.nonAcked should be(Vector(msg0, msg1)) val b3 = b2.buffer(msg2) b3.nonAcked should be(Vector(msg0, msg1, msg2)) val b4 = b3.acknowledge(Ack(SeqNo(1), nacks = Set(SeqNo(0)))) b4.nonAcked should be(Vector(msg2)) b4.nacked should be(Vector(msg0)) val b5 = b4.buffer(msg3).buffer(msg4) b5.nonAcked should be(Vector(msg2, msg3, msg4)) b5.nacked should be(Vector(msg0)) val b6 = b5.acknowledge(Ack(SeqNo(4), nacks = Set(SeqNo(2), SeqNo(3)))) b6.nonAcked should be(Vector()) b6.nacked should be(Vector(msg2, msg3)) val b7 = b6.acknowledge(Ack(SeqNo(5))) b7.nonAcked should be(Vector.empty) b7.nacked should be(Vector.empty) } "throw exception if non-buffered sequence number is NACKed" in { val b0 = new AckedSendBuffer[Sequenced](10) val msg1 = msg(1) val msg2 = msg(2) val b1 = b0.buffer(msg1).buffer(msg2) intercept[ResendUnfulfillableException] { b1.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0)))) } } } "ReceiveBuffer" must { "enqueue message in buffer if needed, return the list of deliverable messages and acks" in { val b0 = new AckedReceiveBuffer[Sequenced] val msg0 = msg(0) val msg1 = msg(1) val msg2 = msg(2) val msg3 = msg(3) val msg4 = msg(4) val msg5 = msg(5) val (b1, deliver1, ack1) = b0.receive(msg1).extractDeliverable deliver1 should be(Vector.empty) ack1 should be(Ack(SeqNo(1), nacks = Set(SeqNo(0)))) val (b2, deliver2, ack2) = b1.receive(msg0).extractDeliverable deliver2 should be(Vector(msg0, msg1)) ack2 should be(Ack(SeqNo(1))) val (b3, deliver3, ack3) = b2.receive(msg4).extractDeliverable deliver3 should be(Vector.empty) ack3 should be(Ack(SeqNo(4), nacks = Set(SeqNo(2), SeqNo(3)))) val (b4, deliver4, ack4) = b3.receive(msg2).extractDeliverable deliver4 should be(Vector(msg2)) ack4 should be(Ack(SeqNo(4), nacks = Set(SeqNo(3)))) val (b5, deliver5, ack5) = b4.receive(msg5).extractDeliverable deliver5 should be(Vector.empty) ack5 should be(Ack(SeqNo(5), nacks = Set(SeqNo(3)))) val (_, deliver6, ack6) = b5.receive(msg3).extractDeliverable deliver6 should be(Vector(msg3, msg4, msg5)) ack6 should be(Ack(SeqNo(5))) } "handle duplicate arrivals correctly" in { val buf = new AckedReceiveBuffer[Sequenced] val msg0 = msg(0) val msg1 = msg(1) val msg2 = msg(2) val (buf2, _, _) = buf .receive(msg0) .receive(msg1) .receive(msg2) .extractDeliverable val buf3 = buf2.receive(msg0) .receive(msg1) .receive(msg2) val (_, deliver, ack) = buf3.extractDeliverable deliver should be(Vector.empty) ack should be(Ack(SeqNo(2))) } "be able to correctly merge with another receive buffer" in { val buf1 = new AckedReceiveBuffer[Sequenced] val buf2 = new AckedReceiveBuffer[Sequenced] val msg0 = msg(0) val msg1a = msg(1) val msg1b = msg(1) val msg2 = msg(2) val msg3 = msg(3) val buf = buf1.receive(msg1a).receive(msg2).mergeFrom( buf2.receive(msg1b).receive(msg3)) val (_, deliver, ack) = buf.receive(msg0).extractDeliverable deliver should be(Vector(msg0, msg1a, msg2, msg3)) ack should be(Ack(SeqNo(3))) } } "SendBuffer and ReceiveBuffer" must { def happened(p: Double) = ThreadLocalRandom.current().nextDouble() < p @tailrec def geom(p: Double, limit: Int = 5, acc: Int = 0): Int = if (acc == limit) acc else if (happened(p)) acc else geom(p, limit, acc + 1) "correctly cooperate with each other" in { val MsgCount = 1000 val DeliveryProbability = 0.5 val referenceList: Seq[Sequenced] = (0 until MsgCount).toSeq map { i ⇒ msg(i.toLong) } var toSend = referenceList var received = Seq.empty[Sequenced] var sndBuf = new AckedSendBuffer[Sequenced](10) var rcvBuf = new AckedReceiveBuffer[Sequenced] var log = Vector.empty[String] var lastAck: Ack = Ack(SeqNo(-1)) def dbgLog(message: String): Unit = log :+= message def senderSteps(steps: Int, p: Double = 1.0) = { val resends = (sndBuf.nacked ++ sndBuf.nonAcked).take(steps) val sends = if (steps - resends.size > 0) { val tmp = toSend.take(steps - resends.size) toSend = toSend.drop(steps - resends.size) tmp } else Seq.empty[Sequenced] (resends ++ sends) foreach { msg ⇒ if (sends.contains(msg)) sndBuf = sndBuf.buffer(msg) if (happened(p)) { val (updatedRcvBuf, delivers, ack) = rcvBuf.receive(msg).extractDeliverable rcvBuf = updatedRcvBuf dbgLog(s"$sndBuf -- $msg --> $rcvBuf") lastAck = ack received ++= delivers dbgLog(s"R: ${received.mkString(", ")}") } else dbgLog(s"$sndBuf -- $msg --X $rcvBuf") } } def receiverStep(p: Double = 1.0) = { if (happened(p)) { sndBuf = sndBuf.acknowledge(lastAck) dbgLog(s"$sndBuf <-- $lastAck -- $rcvBuf") } else dbgLog(s"$sndBuf X-- $lastAck -- $rcvBuf") } // Dropping phase info(s"Starting unreliable delivery for $MsgCount messages, with delivery probability P = $DeliveryProbability") var steps = MsgCount * 2 while (steps > 0) { val s = geom(0.3, limit = 5) senderSteps(s, DeliveryProbability) receiverStep(DeliveryProbability) steps -= s } info(s"Successfully delivered ${received.size} messages from ${MsgCount}") info("Entering reliable phase") // Finalizing phase for (_ ← 1 to MsgCount) { senderSteps(1, 1.0) receiverStep(1.0) } if (received != referenceList) { println(log.mkString("\n")) println("Received:") println(received) fail("Not all messages were received") } info("All messages have been successfully delivered") } } } Other Akka source code examplesHere is a short list of links related to this Akka AckedDeliverySpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.