alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ReliableProxySpec.scala)

This example Akka source code file (ReliableProxySpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

active, actor, actorref, akka, concurrent, connecting, identify, idle, int, reliableproxymultijvmnode1, reliableproxyspec, state, test, testing, testkit, unit

The ReliableProxySpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.contrib.pattern

import language.postfixOps
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import org.scalatest.BeforeAndAfterEach
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.actor._
import akka.testkit.ImplicitSender
import scala.concurrent.duration._
import akka.actor.FSM
import akka.actor.ActorRef
import akka.testkit.TestKitExtension
import akka.actor.ActorIdentity
import akka.actor.Identify

object ReliableProxySpec extends MultiNodeConfig {
  val local = role("local")
  val remote = role("remote")

  testTransport(on = true)
}

class ReliableProxyMultiJvmNode1 extends ReliableProxySpec
class ReliableProxyMultiJvmNode2 extends ReliableProxySpec

class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNodeSpec with BeforeAndAfterEach with ImplicitSender {
  import ReliableProxySpec._
  import ReliableProxy._

  override def initialParticipants = roles.size

  override def afterEach() {
    runOn(local) {
      testConductor.passThrough(local, remote, Direction.Both).await
    }
    enterBarrier("after-each")
  }

  @volatile var target: ActorRef = system.deadLetters
  @volatile var proxy: ActorRef = system.deadLetters

  def idTarget(): Unit = {
    system.actorSelection(node(remote) / "user" / "echo") ! Identify("echo")
    target = expectMsgType[ActorIdentity].ref.get
  }

  def startTarget(): Unit = {
    target = system.actorOf(Props(new Actor {
      def receive = {
        case x ⇒ testActor ! x
      }
    }).withDeploy(Deploy.local), "echo")
  }

  def stopProxy(): Unit = {
    proxy ! FSM.UnsubscribeTransitionCallBack(testActor)
    system stop proxy
    expectMsgType[Terminated]
  }

  def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s))
  def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2))
  def expectTransition(max: FiniteDuration, s1: State, s2: State) = expectMsg(max, FSM.Transition(proxy, s1, s2))

  def sendN(n: Int) = (1 to n) foreach (proxy ! _)
  def expectN(n: Int) = (1 to n) foreach { n ⇒ expectMsg(n); lastSender should be(target) }

  // avoid too long timeout for expectNoMsg when using dilated timeouts, because
  // blackhole will trigger failure detection 
  val expectNoMsgTimeout = {
    val timeFactor = TestKitExtension(system).TestTimeFactor
    if (timeFactor > 1.0) (1.0 / timeFactor).seconds else 1.second
  }

  "A ReliableProxy" must {

    "initialize properly" in {
      runOn(remote) {
        startTarget()
      }

      enterBarrier("initialize")

      runOn(local) {
        import akka.contrib.pattern.ReliableProxy

        idTarget()
        proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis, 5.seconds), "proxy")
        watch(proxy)
        proxy ! FSM.SubscribeTransitionCallBack(testActor)
        expectState(Connecting)
        proxy ! "hello"
        expectMsgType[TargetChanged]
        expectTransition(Connecting, Active)
        expectTransition(Active, Idle)
      }

      runOn(remote) {
        expectMsg("hello")
      }

      enterBarrier("initialize-done")
    }

    "forward messages in sequence" in {
      runOn(local) {
        sendN(100)
        expectTransition(Idle, Active)
        expectTransition(Active, Idle)
      }
      runOn(remote) {
        within(1 second) {
          expectN(100)
        }
      }

      enterBarrier("test1a")

      runOn(local) {
        sendN(100)
        expectTransition(Idle, Active)
        expectTransition(Active, Idle)
      }
      runOn(remote) {
        within(1 second) {
          expectN(100)
        }
      }

      enterBarrier("test1b")
    }

    "retry when sending fails" in {
      runOn(local) {
        testConductor.blackhole(local, remote, Direction.Send).await
        sendN(100)
        expectTransition(1 second, Idle, Active)
        expectNoMsg(expectNoMsgTimeout)
      }

      enterBarrier("test2a")

      runOn(remote) {
        expectNoMsg(0 seconds)
      }

      enterBarrier("test2b")

      runOn(local) {
        testConductor.passThrough(local, remote, Direction.Send).await
        expectTransition(5 seconds, Active, Idle)
      }
      runOn(remote) {
        within(1 second) {
          expectN(100)
        }
      }

      enterBarrier("test2c")
    }

    "retry when receiving fails" in {
      runOn(local) {
        testConductor.blackhole(local, remote, Direction.Receive).await
        sendN(100)
        expectTransition(1 second, Idle, Active)
        expectNoMsg(expectNoMsgTimeout)
      }
      runOn(remote) {
        within(1 second) {
          expectN(100)
        }
      }

      enterBarrier("test3a")

      runOn(local) {
        testConductor.passThrough(local, remote, Direction.Receive).await
        expectTransition(5 seconds, Active, Idle)
      }

      enterBarrier("test3b")
    }

    "resend across a slow outbound link" in {
      runOn(local) {
        // the rateMBit value is derived from empirical studies so that it will trigger resends,
        // the exact value is not important, but it should not be too large
        testConductor.throttle(local, remote, Direction.Send, rateMBit = 0.02).await
        sendN(50)
        within(5 seconds) {
          expectTransition(Idle, Active)
          // use the slow link for a while, which will trigger resends
          Thread.sleep(2000)
          // full speed, and it will catch up outstanding messages
          testConductor.passThrough(local, remote, Direction.Send).await
          expectTransition(Active, Idle)
        }
      }
      runOn(remote) {
        within(5 seconds) {
          expectN(50)
        }
        expectNoMsg(expectNoMsgTimeout)
      }

      enterBarrier("test4")
    }

    "resend across a slow inbound link" in {
      runOn(local) {
        testConductor.passThrough(local, remote, Direction.Send).await
        // the rateMBit value is derived from empirical studies so that it will trigger resends,
        // the exact value is not important, but it should not be too large
        testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.02).await
        sendN(50)
        within(5 seconds) {
          expectTransition(Idle, Active)
          // use the slow link for a while, which will trigger resends
          Thread.sleep(2000)
          // full speed, and it will catch up outstanding messages
          testConductor.passThrough(local, remote, Direction.Receive).await
          expectTransition(Active, Idle)
        }
      }
      runOn(remote) {
        within(1 second) {
          expectN(50)
        }
        expectNoMsg(2 seconds)
      }

      enterBarrier("test5")
    }

    "reconnect to target" in {
      runOn(remote) {
        // Stop the target
        system stop target
      }

      runOn(local) {
        // After the target stops the proxy will change to Reconnecting
        within(5 seconds) {
          expectTransition(Idle, Connecting)
        }
        // Send some messages while it's reconnecting
        sendN(50)
      }

      enterBarrier("test6a")

      runOn(remote) {
        // Restart the target to have something to reconnect to
        startTarget()
      }

      runOn(local) {
        // After reconnecting a we'll get a TargetChanged message
        // and the proxy will transition to Active to send the outstanding messages
        within(10 seconds) {
          expectMsgType[TargetChanged]
          expectTransition(Connecting, Active)
        }
      }

      enterBarrier("test6b")

      runOn(local) {
        // After the messages have been delivered, proxy is back to idle
        expectTransition(Active, Idle)
      }

      runOn(remote) {
        expectN(50)
      }

      enterBarrier("test6c")
    }

    "stop proxy if target stops and no reconnection" in {
      runOn(local) {
        stopProxy() // Stop previous proxy

        // Start new proxy with no reconnections
        proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis), "proxy")
        proxy ! FSM.SubscribeTransitionCallBack(testActor)
        watch(proxy)

        expectState(Connecting)
        expectMsgType[TargetChanged]
        expectTransition(Connecting, Idle)
      }

      enterBarrier("test7a")

      runOn(remote) {
        // Stop the target, this will cause the proxy to stop
        system stop target
      }

      runOn(local) {
        within(5 seconds) {
          expectMsgType[ProxyTerminated]
          expectMsgType[Terminated]
        }
      }

      enterBarrier("test7b")

    }

    "stop proxy after max reconnections" in {
      runOn(remote) {
        // Target is not running after previous test, start it
        startTarget()
      }

      runOn(local) {
        // Get new target's ref
        idTarget()
      }

      enterBarrier("test8a")

      runOn(local) {
        // Proxy is not running after previous test
        // Start new proxy with 3 reconnections every 2 sec
        proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis, 2.seconds, 3), "proxy")
        proxy ! FSM.SubscribeTransitionCallBack(testActor)
        watch(proxy)
        expectState(Connecting)
        expectMsgType[TargetChanged]
        expectTransition(Connecting, Idle)
      }

      enterBarrier("test8b")

      runOn(remote) {
        // Stop target
        system stop target
      }

      runOn(local) {
        // Wait for transition to Connecting, then send messages
        within(5 seconds) {
          expectTransition(Idle, Connecting)
        }
        sendN(50)
      }

      enterBarrier("test8c")

      runOn(local) {
        // After max reconnections, proxy stops itself.  Expect ProxyTerminated(Unsent(msgs, sender, serial)).
        within(5 * 2.seconds) {
          val proxyTerm = expectMsgType[ProxyTerminated]
          // Validate that the unsent messages are 50 ints
          val unsentInts = proxyTerm.outstanding.queue collect { case Message(i: Int, _, _) if i > 0 && i <= 50 ⇒ i }
          unsentInts should have size 50
          expectMsgType[Terminated]
        }
      }

      enterBarrier("test8d")
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ReliableProxySpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.