|
Akka/Scala example source code file (ReliableProxySpec.scala)
The ReliableProxySpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.contrib.pattern import language.postfixOps import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import org.scalatest.BeforeAndAfterEach import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.actor._ import akka.testkit.ImplicitSender import scala.concurrent.duration._ import akka.actor.FSM import akka.actor.ActorRef import akka.testkit.TestKitExtension import akka.actor.ActorIdentity import akka.actor.Identify object ReliableProxySpec extends MultiNodeConfig { val local = role("local") val remote = role("remote") testTransport(on = true) } class ReliableProxyMultiJvmNode1 extends ReliableProxySpec class ReliableProxyMultiJvmNode2 extends ReliableProxySpec class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNodeSpec with BeforeAndAfterEach with ImplicitSender { import ReliableProxySpec._ import ReliableProxy._ override def initialParticipants = roles.size override def afterEach() { runOn(local) { testConductor.passThrough(local, remote, Direction.Both).await } enterBarrier("after-each") } @volatile var target: ActorRef = system.deadLetters @volatile var proxy: ActorRef = system.deadLetters def idTarget(): Unit = { system.actorSelection(node(remote) / "user" / "echo") ! Identify("echo") target = expectMsgType[ActorIdentity].ref.get } def startTarget(): Unit = { target = system.actorOf(Props(new Actor { def receive = { case x ⇒ testActor ! x } }).withDeploy(Deploy.local), "echo") } def stopProxy(): Unit = { proxy ! FSM.UnsubscribeTransitionCallBack(testActor) system stop proxy expectMsgType[Terminated] } def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s)) def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2)) def expectTransition(max: FiniteDuration, s1: State, s2: State) = expectMsg(max, FSM.Transition(proxy, s1, s2)) def sendN(n: Int) = (1 to n) foreach (proxy ! _) def expectN(n: Int) = (1 to n) foreach { n ⇒ expectMsg(n); lastSender should be(target) } // avoid too long timeout for expectNoMsg when using dilated timeouts, because // blackhole will trigger failure detection val expectNoMsgTimeout = { val timeFactor = TestKitExtension(system).TestTimeFactor if (timeFactor > 1.0) (1.0 / timeFactor).seconds else 1.second } "A ReliableProxy" must { "initialize properly" in { runOn(remote) { startTarget() } enterBarrier("initialize") runOn(local) { import akka.contrib.pattern.ReliableProxy idTarget() proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis, 5.seconds), "proxy") watch(proxy) proxy ! FSM.SubscribeTransitionCallBack(testActor) expectState(Connecting) proxy ! "hello" expectMsgType[TargetChanged] expectTransition(Connecting, Active) expectTransition(Active, Idle) } runOn(remote) { expectMsg("hello") } enterBarrier("initialize-done") } "forward messages in sequence" in { runOn(local) { sendN(100) expectTransition(Idle, Active) expectTransition(Active, Idle) } runOn(remote) { within(1 second) { expectN(100) } } enterBarrier("test1a") runOn(local) { sendN(100) expectTransition(Idle, Active) expectTransition(Active, Idle) } runOn(remote) { within(1 second) { expectN(100) } } enterBarrier("test1b") } "retry when sending fails" in { runOn(local) { testConductor.blackhole(local, remote, Direction.Send).await sendN(100) expectTransition(1 second, Idle, Active) expectNoMsg(expectNoMsgTimeout) } enterBarrier("test2a") runOn(remote) { expectNoMsg(0 seconds) } enterBarrier("test2b") runOn(local) { testConductor.passThrough(local, remote, Direction.Send).await expectTransition(5 seconds, Active, Idle) } runOn(remote) { within(1 second) { expectN(100) } } enterBarrier("test2c") } "retry when receiving fails" in { runOn(local) { testConductor.blackhole(local, remote, Direction.Receive).await sendN(100) expectTransition(1 second, Idle, Active) expectNoMsg(expectNoMsgTimeout) } runOn(remote) { within(1 second) { expectN(100) } } enterBarrier("test3a") runOn(local) { testConductor.passThrough(local, remote, Direction.Receive).await expectTransition(5 seconds, Active, Idle) } enterBarrier("test3b") } "resend across a slow outbound link" in { runOn(local) { // the rateMBit value is derived from empirical studies so that it will trigger resends, // the exact value is not important, but it should not be too large testConductor.throttle(local, remote, Direction.Send, rateMBit = 0.02).await sendN(50) within(5 seconds) { expectTransition(Idle, Active) // use the slow link for a while, which will trigger resends Thread.sleep(2000) // full speed, and it will catch up outstanding messages testConductor.passThrough(local, remote, Direction.Send).await expectTransition(Active, Idle) } } runOn(remote) { within(5 seconds) { expectN(50) } expectNoMsg(expectNoMsgTimeout) } enterBarrier("test4") } "resend across a slow inbound link" in { runOn(local) { testConductor.passThrough(local, remote, Direction.Send).await // the rateMBit value is derived from empirical studies so that it will trigger resends, // the exact value is not important, but it should not be too large testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.02).await sendN(50) within(5 seconds) { expectTransition(Idle, Active) // use the slow link for a while, which will trigger resends Thread.sleep(2000) // full speed, and it will catch up outstanding messages testConductor.passThrough(local, remote, Direction.Receive).await expectTransition(Active, Idle) } } runOn(remote) { within(1 second) { expectN(50) } expectNoMsg(2 seconds) } enterBarrier("test5") } "reconnect to target" in { runOn(remote) { // Stop the target system stop target } runOn(local) { // After the target stops the proxy will change to Reconnecting within(5 seconds) { expectTransition(Idle, Connecting) } // Send some messages while it's reconnecting sendN(50) } enterBarrier("test6a") runOn(remote) { // Restart the target to have something to reconnect to startTarget() } runOn(local) { // After reconnecting a we'll get a TargetChanged message // and the proxy will transition to Active to send the outstanding messages within(10 seconds) { expectMsgType[TargetChanged] expectTransition(Connecting, Active) } } enterBarrier("test6b") runOn(local) { // After the messages have been delivered, proxy is back to idle expectTransition(Active, Idle) } runOn(remote) { expectN(50) } enterBarrier("test6c") } "stop proxy if target stops and no reconnection" in { runOn(local) { stopProxy() // Stop previous proxy // Start new proxy with no reconnections proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis), "proxy") proxy ! FSM.SubscribeTransitionCallBack(testActor) watch(proxy) expectState(Connecting) expectMsgType[TargetChanged] expectTransition(Connecting, Idle) } enterBarrier("test7a") runOn(remote) { // Stop the target, this will cause the proxy to stop system stop target } runOn(local) { within(5 seconds) { expectMsgType[ProxyTerminated] expectMsgType[Terminated] } } enterBarrier("test7b") } "stop proxy after max reconnections" in { runOn(remote) { // Target is not running after previous test, start it startTarget() } runOn(local) { // Get new target's ref idTarget() } enterBarrier("test8a") runOn(local) { // Proxy is not running after previous test // Start new proxy with 3 reconnections every 2 sec proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis, 2.seconds, 3), "proxy") proxy ! FSM.SubscribeTransitionCallBack(testActor) watch(proxy) expectState(Connecting) expectMsgType[TargetChanged] expectTransition(Connecting, Idle) } enterBarrier("test8b") runOn(remote) { // Stop target system stop target } runOn(local) { // Wait for transition to Connecting, then send messages within(5 seconds) { expectTransition(Idle, Connecting) } sendN(50) } enterBarrier("test8c") runOn(local) { // After max reconnections, proxy stops itself. Expect ProxyTerminated(Unsent(msgs, sender, serial)). within(5 * 2.seconds) { val proxyTerm = expectMsgType[ProxyTerminated] // Validate that the unsent messages are 50 ints val unsentInts = proxyTerm.outstanding.queue collect { case Message(i: Int, _, _) if i > 0 && i <= 50 ⇒ i } unsentInts should have size 50 expectMsgType[Terminated] } } enterBarrier("test8d") } } } Other Akka source code examplesHere is a short list of links related to this Akka ReliableProxySpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.