|
Akka/Scala example source code file (RemoteNodeShutdownAndComesBackSpec.scala)
The RemoteNodeShutdownAndComesBackSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociate, Direction }
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.actor.ActorIdentity
import akka.remote.testconductor.RoleName
import akka.actor.Identify
import scala.concurrent.Await
object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = INFO
## Keep it tight, otherwise reestablishing a connection takes too much time
akka.remote.transport-failure-detector.heartbeat-interval = 1 s
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 60 s
akka.remote.gate-invalid-addresses-for = 0.5 s
""")))
testTransport(on = true)
class Subject extends Actor {
def receive = {
case "shutdown" ⇒ context.system.shutdown()
case msg ⇒ sender() ! msg
}
}
}
class RemoteNodeShutdownAndComesBackMultiJvmNode1 extends RemoteNodeShutdownAndComesBackSpec
class RemoteNodeShutdownAndComesBackMultiJvmNode2 extends RemoteNodeShutdownAndComesBackSpec
abstract class RemoteNodeShutdownAndComesBackSpec
extends MultiNodeSpec(RemoteNodeShutdownAndComesBackSpec)
with STMultiNodeSpec with ImplicitSender {
import RemoteNodeShutdownAndComesBackSpec._
override def initialParticipants = roles.size
def identify(role: RoleName, actorName: String): ActorRef = {
system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName)
expectMsgType[ActorIdentity].ref.get
}
"RemoteNodeShutdownAndComesBack" must {
"properly reset system message buffer state when new system with same Address comes up" taggedAs LongRunningTest in {
runOn(first) {
val secondAddress = node(second).address
system.actorOf(Props[Subject], "subject1")
enterBarrier("actors-started")
val subject = identify(second, "subject")
val sysmsgBarrier = identify(second, "sysmsgBarrier")
// Prime up the system message buffer
watch(subject)
enterBarrier("watch-established")
// Wait for proper system message propagation
// (Using a helper actor to ensure that all previous system messages arrived)
watch(sysmsgBarrier)
system.stop(sysmsgBarrier)
expectTerminated(sysmsgBarrier)
// Drop all messages from this point so no SHUTDOWN is ever received
testConductor.blackhole(second, first, Direction.Send).await
// Shut down all existing connections so that the system can enter recovery mode (association attempts)
Await.result(RARP(system).provider.transport.managementCommand(ForceDisassociate(node(second).address)), 3.seconds)
// Trigger reconnect attempt and also queue up a system message to be in limbo state (UID of remote system
// is unknown, and system message is pending)
system.stop(subject)
// Get rid of old system -- now SHUTDOWN is lost
testConductor.shutdown(second).await
// At this point the second node is restarting, while the first node is trying to reconnect without resetting
// the system message send state
// Now wait until second system becomes alive again
within(30.seconds) {
// retry because the Subject actor might not be started yet
awaitAssert {
val p = TestProbe()
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject").tell(Identify("subject"), p.ref)
p.expectMsgPF(1 second) {
case ActorIdentity("subject", Some(ref)) ⇒ true
}
}
}
expectTerminated(subject)
// Establish watch with the new system. This triggers additional system message traffic. If buffers are out
// of synch the remote system will be quarantined and the rest of the test will fail (or even in earlier
// stages depending on circumstances).
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! Identify("subject")
val subjectNew = expectMsgType[ActorIdentity].ref.get
watch(subjectNew)
subjectNew ! "shutdown"
fishForMessage(5.seconds) {
case _: ActorIdentity ⇒ false
case Terminated(subjectNew) ⇒ true
}
}
runOn(second) {
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
system.actorOf(Props[Subject], "subject")
system.actorOf(Props[Subject], "sysmsgBarrier")
val path = node(first)
enterBarrier("actors-started")
enterBarrier("watch-established")
system.awaitTermination(30.seconds)
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.netty.tcp {
hostname = ${addr.host.get}
port = ${addr.port.get}
}
""").withFallback(system.settings.config))
freshSystem.actorOf(Props[Subject], "subject")
freshSystem.awaitTermination(30.seconds)
}
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka RemoteNodeShutdownAndComesBackSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.