|
Akka/Scala example source code file (ThrottlerTransportAdapterSpec.scala)
The ThrottlerTransportAdapterSpec.scala Akka example source code
package akka.remote.transport
import com.typesafe.config.{ ConfigFactory, Config }
import akka.actor._
import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec }
import ThrottlerTransportAdapterSpec._
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.remote.transport.ThrottlerTransportAdapter._
import akka.remote.RemoteActorRefProvider
import akka.testkit.TestEvent
import akka.testkit.EventFilter
import akka.remote.EndpointException
object ThrottlerTransportAdapterSpec {
val configA: Config = ConfigFactory parseString ("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.tcp.hostname = "localhost"
remote.log-remote-lifecycle-events = off
remote.retry-gate-closed-for = 1 s
remote.transport-failure-detector.heartbeat-interval = 1 s
remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
remote.netty.tcp.applied-adapters = ["trttl"]
remote.netty.tcp.port = 0
}
""")
class Echo extends Actor {
override def receive = {
case "ping" ⇒ sender() ! "pong"
case x ⇒ sender() ! x
}
}
val PingPacketSize = 148
val MessageCount = 30
val BytesPerSecond = 500
val TotalTime: Long = (MessageCount * PingPacketSize) / BytesPerSecond
class ThrottlingTester(remote: ActorRef, controller: ActorRef) extends Actor {
var messageCount = MessageCount
var received = 0
var startTime = 0L
override def receive = {
case "start" ⇒
self ! "sendNext"
startTime = System.nanoTime()
case "sendNext" ⇒ if (messageCount > 0) {
remote ! "ping"
self ! "sendNext"
messageCount -= 1
}
case "pong" ⇒
received += 1
if (received >= MessageCount) controller ! (System.nanoTime() - startTime)
}
}
final case class Lost(msg: String)
}
class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {
val systemB = ActorSystem("systemB", system.settings.config)
val remote = systemB.actorOf(Props[Echo], "echo")
val rootB = RootActorPath(systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress)
val here = {
system.actorSelection(rootB / "user" / "echo") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
def throttle(direction: Direction, mode: ThrottleMode): Boolean = {
val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get)
val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
Await.result(transport.managementCommand(SetThrottle(rootBAddress, direction, mode)), 3.seconds)
}
def disassociate(): Boolean = {
val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get)
val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
Await.result(transport.managementCommand(ForceDisassociate(rootBAddress)), 3.seconds)
}
"ThrottlerTransportAdapter" must {
"maintain average message rate" taggedAs TimingTest in {
throttle(Direction.Send, TokenBucket(200, 500, 0, 0)) should be(true)
val tester = system.actorOf(Props(classOf[ThrottlingTester], here, self)) ! "start"
val time = NANOSECONDS.toSeconds(expectMsgType[Long]((TotalTime + 3).seconds))
log.warning("Total time of transmission: " + time)
time should be > (TotalTime - 3)
throttle(Direction.Send, Unthrottled) should be(true)
}
"survive blackholing" taggedAs TimingTest in {
here ! Lost("Blackhole 1")
expectMsg(Lost("Blackhole 1"))
muteDeadLetters(classOf[Lost])(system)
muteDeadLetters(classOf[Lost])(systemB)
throttle(Direction.Both, Blackhole) should be(true)
here ! Lost("Blackhole 2")
expectNoMsg(1.seconds)
disassociate() should be(true)
expectNoMsg(1.seconds)
throttle(Direction.Both, Unthrottled) should be(true)
// after we remove the Blackhole we can't be certain of the state
// of the connection, repeat until success
here ! Lost("Blackhole 3")
awaitCond({
if (receiveOne(Duration.Zero) == Lost("Blackhole 3"))
true
else {
here ! Lost("Blackhole 3")
false
}
}, 15.seconds)
here ! "Cleanup"
fishForMessage(5.seconds) {
case "Cleanup" ⇒ true
case Lost("Blackhole 3") ⇒ false
}
}
}
override def beforeTermination() {
system.eventStream.publish(TestEvent.Mute(
EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
systemB.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
}
override def afterTermination(): Unit = shutdown(systemB)
}
class ThrottlerTransportAdapterGenericSpec extends GenericTransportSpec(withAkkaProtocol = true) {
def transportName = "ThrottlerTransportAdapter"
def schemeIdentifier = "akka.trttl"
def freshTransport(testTransport: TestTransport) =
new ThrottlerTransportAdapter(testTransport, system.asInstanceOf[ExtendedActorSystem])
}
Other Akka source code examplesHere is a short list of links related to this Akka ThrottlerTransportAdapterSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.