|
Akka/Scala example source code file (SystemMessageDeliveryStressTest.scala)
The SystemMessageDeliveryStressTest.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.remote.transport import akka.testkit.TimingTest import akka.testkit.DefaultTimeout import akka.testkit.ImplicitSender import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec } import com.typesafe.config.{ Config, ConfigFactory } import AkkaProtocolStressTest._ import akka.actor._ import scala.concurrent.duration._ import akka.testkit._ import akka.remote.EndpointException import akka.remote.{ RARP, EndpointException } import akka.remote.transport.FailureInjectorTransportAdapter.{ One, All, Drop } import scala.concurrent.Await import akka.actor.ActorRef import akka.actor.Actor import akka.testkit.AkkaSpec import akka.actor.ActorSystem import akka.actor.Props import akka.actor.ExtendedActorSystem import akka.actor.RootActorPath import akka.remote.transport.FailureInjectorTransportAdapter.One import akka.remote.transport.FailureInjectorTransportAdapter.Drop import akka.testkit.TestEvent import akka.testkit.EventFilter import akka.event.Logging import akka.dispatch.sysmsg.{ Failed, SystemMessage } import akka.pattern.pipe object SystemMessageDeliveryStressTest { val baseConfig: Config = ConfigFactory parseString (""" akka { #loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" actor.serialize-messages = off remote.log-remote-lifecycle-events = on remote.transport-failure-detector { threshold = 1.0 max-sample-size = 2 min-std-deviation = 1 ms heartbeat-interval = 500 ms acceptable-heartbeat-pause = 2 s } ## Keep this setting tight, otherwise the test takes a long time or times out remote.resend-interval = 0.5 s remote.use-passive-connections = on remote.netty.tcp { applied-adapters = ["gremlin"] port = 0 } } """) class SystemMessageSequenceVerifier(system: ActorSystem, testActor: ActorRef) extends MinimalActorRef { val provider = RARP(system).provider val path = provider.tempPath() RARP(system).provider.registerTempActor(this, path) override def getParent = provider.tempContainer override def sendSystemMessage(message: SystemMessage): Unit = { message match { case Failed(_, _, seq) ⇒ testActor ! seq case _ ⇒ } } } class SystemMessageSender(val msgCount: Int, val target: ActorRef) extends Actor { var counter = 0 val targetRef = target.asInstanceOf[InternalActorRef] override def preStart(): Unit = self ! "sendnext" override def receive = { case "sendnext" ⇒ targetRef.sendSystemMessage(Failed(null, null, counter)) counter += 1 if (counter < msgCount) self ! "sendnext" } } } abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String) extends AkkaSpec(ConfigFactory.parseString(cfg).withFallback(SystemMessageDeliveryStressTest.baseConfig)) with ImplicitSender with DefaultTimeout { import SystemMessageDeliveryStressTest._ val systemB = ActorSystem("systemB", system.settings.config) val sysMsgVerifier = new SystemMessageSequenceVerifier(system, testActor) val MsgCount = 100 val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val root = RootActorPath(address) // We test internals here (system message delivery) so we are allowed to cheat val there = RARP(systemB).provider.resolveActorRef(root / "temp" / sysMsgVerifier.path.name).asInstanceOf[InternalActorRef] override def atStartup() = { system.eventStream.publish(TestEvent.Mute( EventFilter.error(start = "AssociationError"), EventFilter.warning(pattern = "received dead letter.*"))) systemB.eventStream.publish(TestEvent.Mute( EventFilter[EndpointException](), EventFilter.error(start = "AssociationError"), EventFilter.warning(pattern = "received dead letter.*"))) } "Remoting " + msg must { "guaranteed delivery and message ordering despite packet loss " taggedAs TimingTest in { Await.result(RARP(systemB).provider.transport.managementCommand(One(address, Drop(0.3, 0.3))), 3.seconds.dilated) systemB.actorOf(Props(classOf[SystemMessageSender], MsgCount, there)) val toSend = (0 until MsgCount).toList val received = expectMsgAllOf(45.seconds, toSend: _*) received should be(toSend) } } override def beforeTermination() { system.eventStream.publish(TestEvent.Mute( EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"), EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)"))) systemB.eventStream.publish(TestEvent.Mute( EventFilter[EndpointException](), EventFilter.error(start = "AssociationError"), EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)"))) } override def afterTermination(): Unit = shutdown(systemB) } class SystemMessageDeliveryRetryGate extends SystemMessageDeliveryStressTest("passive connections on", "akka.remote.retry-gate-closed-for = 0.5 s") class SystemMessageDeliveryNoPassiveRetryGate extends SystemMessageDeliveryStressTest("passive connections off", """ akka.remote.use-passive-connections = off akka.remote.retry-gate-closed-for = 0.5 s """) Other Akka source code examplesHere is a short list of links related to this Akka SystemMessageDeliveryStressTest.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.