|
Akka/Scala example source code file (RemoteWatcherSpec.scala)
The RemoteWatcherSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import language.postfixOps
import scala.concurrent.duration._
import akka.testkit._
import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.ExtendedActorSystem
import akka.actor.RootActorPath
import akka.actor.Identify
import akka.actor.ActorIdentity
import akka.actor.PoisonPill
import akka.actor.Address
object RemoteWatcherSpec {
class TestActorProxy(testActor: ActorRef) extends Actor {
def receive = {
case msg ⇒ testActor forward msg
}
}
class MyActor extends Actor {
def receive = Actor.emptyBehavior
}
// turn off all periodic activity
val TurnOff = 5.minutes
def createFailureDetector(): FailureDetectorRegistry[Address] = {
def createFailureDetector(): FailureDetector =
new PhiAccrualFailureDetector(
threshold = 8.0,
maxSampleSize = 200,
minStdDeviation = 100.millis,
acceptableHeartbeatPause = 3.seconds,
firstHeartbeatEstimate = 1.second)
new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector())
}
object TestRemoteWatcher {
final case class AddressTerm(address: Address)
final case class Quarantined(address: Address, uid: Option[Int])
}
class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector,
heartbeatInterval = TurnOff,
unreachableReaperInterval = TurnOff,
heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter) {
def this() = this(heartbeatExpectedResponseAfter = TurnOff)
override def publishAddressTerminated(address: Address): Unit =
// don't publish the real AddressTerminated, but a testable message,
// that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
override def quarantine(address: Address, uid: Option[Int]): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteWatcherSpec extends AkkaSpec(
"""akka {
loglevel = INFO
log-dead-letters-during-shutdown = false
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.tcp {
hostname = localhost
port = 0
}
}""") with ImplicitSender {
import RemoteWatcherSpec._
import RemoteWatcher._
override def expectedTestDuration = 2.minutes
val remoteSystem = ActorSystem("RemoteSystem", system.settings.config)
val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid
Seq(system, remoteSystem).foreach(muteDeadLetters(
akka.remote.transport.AssociationHandle.Disassociated.getClass,
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass)(_))
override def afterTermination() {
shutdown(remoteSystem)
}
val heartbeatRspB = HeartbeatRsp(remoteAddressUid)
def createRemoteActor(props: Props, name: String): ActorRef = {
remoteSystem.actorOf(props, name)
system.actorSelection(RootActorPath(remoteAddress) / "user" / name) ! Identify(name)
expectMsgType[ActorIdentity].ref.get
}
"A RemoteWatcher" must {
"have correct interaction when watching" in {
val fd = createFailureDetector()
val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor1")
val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1")
val a1 = system.actorOf(Props[MyActor], "a1")
val a2 = system.actorOf(Props[MyActor], "a2")
val b1 = createRemoteActor(Props[MyActor], "b1")
val b2 = createRemoteActor(Props[MyActor], "b2")
monitorA ! WatchRemote(b1, a1)
monitorA ! WatchRemote(b2, a1)
monitorA ! WatchRemote(b2, a2)
monitorA ! Stats
// for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2
// (a1->b1), (a1->b2), (a2->b2)
expectMsg(Stats.counts(watching = 5, watchingNodes = 1))
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
expectNoMsg(100 millis)
monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b1, a1)
// still (a1->b2) and (a2->b2) left
monitorA ! Stats
expectMsg(Stats.counts(watching = 3, watchingNodes = 1))
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b2, a2)
// still (a1->b2) left
monitorA ! Stats
expectMsg(Stats.counts(watching = 2, watchingNodes = 1))
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b2, a1)
// all unwatched
monitorA ! Stats
expectMsg(Stats.empty)
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
expectNoMsg(100 millis)
// make sure nothing floods over to next test
expectNoMsg(2 seconds)
}
"generate AddressTerminated when missing heartbeats" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor4")
val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor4")
val a = system.actorOf(Props[MyActor], "a4")
val b = createRemoteActor(Props[MyActor], "b4")
monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
within(10 seconds) {
awaitAssert {
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, Some(remoteAddressUid)))
}
}
// make sure nothing floods over to next test
expectNoMsg(2 seconds)
}
"generate AddressTerminated when missing first heartbeat" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val fd = createFailureDetector()
val heartbeatExpectedResponseAfter = 2.seconds
val monitorA = system.actorOf(Props(classOf[TestRemoteWatcher], heartbeatExpectedResponseAfter), "monitor5")
val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5")
val a = system.actorOf(Props[MyActor], "a5")
val b = createRemoteActor(Props[MyActor], "b5")
monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// no HeartbeatRsp sent
within(20 seconds) {
awaitAssert {
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
// no real quarantine when missing first heartbeat, uid unknown
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, None))
}
}
// make sure nothing floods over to next test
expectNoMsg(2 seconds)
}
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor6")
val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor6")
val a = system.actorOf(Props[MyActor], "a6")
val b = createRemoteActor(Props[MyActor], "b6")
monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
within(10 seconds) {
awaitAssert {
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, Some(remoteAddressUid)))
}
}
// real AddressTerminated would trigger Terminated for b6, simulate that here
remoteSystem.stop(b)
awaitAssert {
monitorA ! Stats
expectMsg(Stats.empty)
}
expectNoMsg(2 seconds)
// assume that connection comes up again, or remote system is restarted
val c = createRemoteActor(Props[MyActor], "c6")
monitorA ! WatchRemote(c, a)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA ! ReapUnreachableTick
p.expectNoMsg(1 second)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
monitorA ! ReapUnreachableTick
p.expectNoMsg(1 second)
q.expectNoMsg(1 second)
// then stop heartbeating again, should generate new AddressTerminated
within(10 seconds) {
awaitAssert {
monitorA ! HeartbeatTick
expectMsg(Heartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, Some(remoteAddressUid)))
}
}
// make sure nothing floods over to next test
expectNoMsg(2 seconds)
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka RemoteWatcherSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.