alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ThrottlerTransportAdapterSpec.scala)

This example Akka source code file (ThrottlerTransportAdapterSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, address, akka, blackhole, boolean, bytespersecond, concurrent, lost, messagecount, pingpacketsize, remote, test, testing, testkit, throttlertransportadapter, throttlertransportadapterspec

The ThrottlerTransportAdapterSpec.scala Akka example source code

package akka.remote.transport

import com.typesafe.config.{ ConfigFactory, Config }
import akka.actor._
import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec }
import ThrottlerTransportAdapterSpec._
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.remote.transport.ThrottlerTransportAdapter._
import akka.remote.RemoteActorRefProvider
import akka.testkit.TestEvent
import akka.testkit.EventFilter
import akka.remote.EndpointException

object ThrottlerTransportAdapterSpec {
  val configA: Config = ConfigFactory parseString ("""
    akka {
      actor.provider = "akka.remote.RemoteActorRefProvider"

      remote.netty.tcp.hostname = "localhost"
      remote.log-remote-lifecycle-events = off
      remote.retry-gate-closed-for = 1 s
      remote.transport-failure-detector.heartbeat-interval = 1 s
      remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s

      remote.netty.tcp.applied-adapters = ["trttl"]
      remote.netty.tcp.port = 0
    }
                                                   """)

  class Echo extends Actor {
    override def receive = {
      case "ping" ⇒ sender() ! "pong"
      case x      ⇒ sender() ! x
    }
  }

  val PingPacketSize = 148
  val MessageCount = 30
  val BytesPerSecond = 500
  val TotalTime: Long = (MessageCount * PingPacketSize) / BytesPerSecond

  class ThrottlingTester(remote: ActorRef, controller: ActorRef) extends Actor {
    var messageCount = MessageCount
    var received = 0
    var startTime = 0L

    override def receive = {
      case "start" ⇒
        self ! "sendNext"
        startTime = System.nanoTime()
      case "sendNext" ⇒ if (messageCount > 0) {
        remote ! "ping"
        self ! "sendNext"
        messageCount -= 1
      }
      case "pong" ⇒
        received += 1
        if (received >= MessageCount) controller ! (System.nanoTime() - startTime)
    }
  }

  final case class Lost(msg: String)
}

class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {

  val systemB = ActorSystem("systemB", system.settings.config)
  val remote = systemB.actorOf(Props[Echo], "echo")

  val rootB = RootActorPath(systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress)
  val here = {
    system.actorSelection(rootB / "user" / "echo") ! Identify(None)
    expectMsgType[ActorIdentity].ref.get
  }

  def throttle(direction: Direction, mode: ThrottleMode): Boolean = {
    val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get)
    val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
    Await.result(transport.managementCommand(SetThrottle(rootBAddress, direction, mode)), 3.seconds)
  }

  def disassociate(): Boolean = {
    val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get)
    val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
    Await.result(transport.managementCommand(ForceDisassociate(rootBAddress)), 3.seconds)
  }

  "ThrottlerTransportAdapter" must {
    "maintain average message rate" taggedAs TimingTest in {
      throttle(Direction.Send, TokenBucket(200, 500, 0, 0)) should be(true)
      val tester = system.actorOf(Props(classOf[ThrottlingTester], here, self)) ! "start"

      val time = NANOSECONDS.toSeconds(expectMsgType[Long]((TotalTime + 3).seconds))
      log.warning("Total time of transmission: " + time)
      time should be > (TotalTime - 3)
      throttle(Direction.Send, Unthrottled) should be(true)
    }

    "survive blackholing" taggedAs TimingTest in {
      here ! Lost("Blackhole 1")
      expectMsg(Lost("Blackhole 1"))

      muteDeadLetters(classOf[Lost])(system)
      muteDeadLetters(classOf[Lost])(systemB)

      throttle(Direction.Both, Blackhole) should be(true)

      here ! Lost("Blackhole 2")
      expectNoMsg(1.seconds)
      disassociate() should be(true)
      expectNoMsg(1.seconds)

      throttle(Direction.Both, Unthrottled) should be(true)

      // after we remove the Blackhole we can't be certain of the state
      // of the connection, repeat until success
      here ! Lost("Blackhole 3")
      awaitCond({
        if (receiveOne(Duration.Zero) == Lost("Blackhole 3"))
          true
        else {
          here ! Lost("Blackhole 3")
          false
        }
      }, 15.seconds)

      here ! "Cleanup"
      fishForMessage(5.seconds) {
        case "Cleanup"           ⇒ true
        case Lost("Blackhole 3") ⇒ false
      }
    }

  }

  override def beforeTermination() {
    system.eventStream.publish(TestEvent.Mute(
      EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"),
      EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
    systemB.eventStream.publish(TestEvent.Mute(
      EventFilter[EndpointException](),
      EventFilter.error(start = "AssociationError"),
      EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
  }

  override def afterTermination(): Unit = shutdown(systemB)
}

class ThrottlerTransportAdapterGenericSpec extends GenericTransportSpec(withAkkaProtocol = true) {

  def transportName = "ThrottlerTransportAdapter"
  def schemeIdentifier = "akka.trttl"
  def freshTransport(testTransport: TestTransport) =
    new ThrottlerTransportAdapter(testTransport, system.asInstanceOf[ExtendedActorSystem])

}

Other Akka source code examples

Here is a short list of links related to this Akka ThrottlerTransportAdapterSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.