alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (RemoteWatcherSpec.scala)

This example Akka source code file (RemoteWatcherSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, address, addressterminated, akka, concurrent, heartbeattick, reapunreachabletick, some, stats, test, testing, testprobe, time, turnoff, unwatchremote, watchremote

The RemoteWatcherSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.remote

import language.postfixOps
import scala.concurrent.duration._
import akka.testkit._
import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.ExtendedActorSystem
import akka.actor.RootActorPath
import akka.actor.Identify
import akka.actor.ActorIdentity
import akka.actor.PoisonPill
import akka.actor.Address

object RemoteWatcherSpec {

  class TestActorProxy(testActor: ActorRef) extends Actor {
    def receive = {
      case msg ⇒ testActor forward msg
    }
  }

  class MyActor extends Actor {
    def receive = Actor.emptyBehavior
  }

  // turn off all periodic activity
  val TurnOff = 5.minutes

  def createFailureDetector(): FailureDetectorRegistry[Address] = {
    def createFailureDetector(): FailureDetector =
      new PhiAccrualFailureDetector(
        threshold = 8.0,
        maxSampleSize = 200,
        minStdDeviation = 100.millis,
        acceptableHeartbeatPause = 3.seconds,
        firstHeartbeatEstimate = 1.second)

    new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector())
  }

  object TestRemoteWatcher {
    final case class AddressTerm(address: Address)
    final case class Quarantined(address: Address, uid: Option[Int])
  }

  class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector,
    heartbeatInterval = TurnOff,
    unreachableReaperInterval = TurnOff,
    heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter) {

    def this() = this(heartbeatExpectedResponseAfter = TurnOff)

    override def publishAddressTerminated(address: Address): Unit =
      // don't publish the real AddressTerminated, but a testable message,
      // that doesn't interfere with the real watch that is going on in the background
      context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))

    override def quarantine(address: Address, uid: Option[Int]): Unit = {
      // don't quarantine in remoting, but publish a testable message
      context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
    }

  }

}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteWatcherSpec extends AkkaSpec(
  """akka {
       loglevel = INFO
       log-dead-letters-during-shutdown = false
       actor.provider = "akka.remote.RemoteActorRefProvider"
       remote.netty.tcp {
         hostname = localhost
         port = 0
       }
     }""") with ImplicitSender {

  import RemoteWatcherSpec._
  import RemoteWatcher._

  override def expectedTestDuration = 2.minutes

  val remoteSystem = ActorSystem("RemoteSystem", system.settings.config)
  val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
  def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid

  Seq(system, remoteSystem).foreach(muteDeadLetters(
    akka.remote.transport.AssociationHandle.Disassociated.getClass,
    akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass)(_))

  override def afterTermination() {
    shutdown(remoteSystem)
  }

  val heartbeatRspB = HeartbeatRsp(remoteAddressUid)

  def createRemoteActor(props: Props, name: String): ActorRef = {
    remoteSystem.actorOf(props, name)
    system.actorSelection(RootActorPath(remoteAddress) / "user" / name) ! Identify(name)
    expectMsgType[ActorIdentity].ref.get
  }

  "A RemoteWatcher" must {

    "have correct interaction when watching" in {

      val fd = createFailureDetector()
      val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor1")
      val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1")

      val a1 = system.actorOf(Props[MyActor], "a1")
      val a2 = system.actorOf(Props[MyActor], "a2")
      val b1 = createRemoteActor(Props[MyActor], "b1")
      val b2 = createRemoteActor(Props[MyActor], "b2")

      monitorA ! WatchRemote(b1, a1)
      monitorA ! WatchRemote(b2, a1)
      monitorA ! WatchRemote(b2, a2)
      monitorA ! Stats
      // for each watchee the RemoteWatcher also adds its own watch: 5 = 3 + 2
      // (a1->b1), (a1->b2), (a2->b2)
      expectMsg(Stats.counts(watching = 5, watchingNodes = 1))
      expectNoMsg(100 millis)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      expectNoMsg(100 millis)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      expectNoMsg(100 millis)
      monitorA.tell(heartbeatRspB, monitorB)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      expectNoMsg(100 millis)

      monitorA ! UnwatchRemote(b1, a1)
      // still (a1->b2) and (a2->b2) left
      monitorA ! Stats
      expectMsg(Stats.counts(watching = 3, watchingNodes = 1))
      expectNoMsg(100 millis)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      expectNoMsg(100 millis)

      monitorA ! UnwatchRemote(b2, a2)
      // still (a1->b2) left
      monitorA ! Stats
      expectMsg(Stats.counts(watching = 2, watchingNodes = 1))
      expectNoMsg(100 millis)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      expectNoMsg(100 millis)

      monitorA ! UnwatchRemote(b2, a1)
      // all unwatched
      monitorA ! Stats
      expectMsg(Stats.empty)
      expectNoMsg(100 millis)
      monitorA ! HeartbeatTick
      expectNoMsg(100 millis)
      monitorA ! HeartbeatTick
      expectNoMsg(100 millis)

      // make sure nothing floods over to next test
      expectNoMsg(2 seconds)
    }

    "generate AddressTerminated when missing heartbeats" in {
      val p = TestProbe()
      val q = TestProbe()
      system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
      system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])

      val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor4")
      val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor4")

      val a = system.actorOf(Props[MyActor], "a4")
      val b = createRemoteActor(Props[MyActor], "b4")

      monitorA ! WatchRemote(b, a)

      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA.tell(heartbeatRspB, monitorB)
      expectNoMsg(1 second)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA.tell(heartbeatRspB, monitorB)

      within(10 seconds) {
        awaitAssert {
          monitorA ! HeartbeatTick
          expectMsg(Heartbeat)
          // but no HeartbeatRsp
          monitorA ! ReapUnreachableTick
          p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
          q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, Some(remoteAddressUid)))
        }
      }

      // make sure nothing floods over to next test
      expectNoMsg(2 seconds)
    }

    "generate AddressTerminated when missing first heartbeat" in {
      val p = TestProbe()
      val q = TestProbe()
      system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
      system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])

      val fd = createFailureDetector()
      val heartbeatExpectedResponseAfter = 2.seconds
      val monitorA = system.actorOf(Props(classOf[TestRemoteWatcher], heartbeatExpectedResponseAfter), "monitor5")
      val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5")

      val a = system.actorOf(Props[MyActor], "a5")
      val b = createRemoteActor(Props[MyActor], "b5")

      monitorA ! WatchRemote(b, a)

      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      // no HeartbeatRsp sent

      within(20 seconds) {
        awaitAssert {
          monitorA ! HeartbeatTick
          expectMsg(Heartbeat)
          // but no HeartbeatRsp
          monitorA ! ReapUnreachableTick
          p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
          // no real quarantine when missing first heartbeat, uid unknown
          q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, None))
        }
      }

      // make sure nothing floods over to next test
      expectNoMsg(2 seconds)
    }

    "generate AddressTerminated for new watch after broken connection that was re-established and broken again" in {
      val p = TestProbe()
      val q = TestProbe()
      system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
      system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])

      val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor6")
      val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor6")

      val a = system.actorOf(Props[MyActor], "a6")
      val b = createRemoteActor(Props[MyActor], "b6")

      monitorA ! WatchRemote(b, a)

      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA.tell(heartbeatRspB, monitorB)
      expectNoMsg(1 second)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA.tell(heartbeatRspB, monitorB)

      within(10 seconds) {
        awaitAssert {
          monitorA ! HeartbeatTick
          expectMsg(Heartbeat)
          // but no HeartbeatRsp
          monitorA ! ReapUnreachableTick
          p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
          q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, Some(remoteAddressUid)))
        }
      }

      // real AddressTerminated would trigger Terminated for b6, simulate that here
      remoteSystem.stop(b)
      awaitAssert {
        monitorA ! Stats
        expectMsg(Stats.empty)
      }
      expectNoMsg(2 seconds)

      // assume that connection comes up again, or remote system is restarted
      val c = createRemoteActor(Props[MyActor], "c6")

      monitorA ! WatchRemote(c, a)

      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA.tell(heartbeatRspB, monitorB)
      expectNoMsg(1 second)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA.tell(heartbeatRspB, monitorB)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA ! ReapUnreachableTick
      p.expectNoMsg(1 second)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA.tell(heartbeatRspB, monitorB)
      monitorA ! HeartbeatTick
      expectMsg(Heartbeat)
      monitorA ! ReapUnreachableTick
      p.expectNoMsg(1 second)
      q.expectNoMsg(1 second)

      // then stop heartbeating again, should generate new AddressTerminated
      within(10 seconds) {
        awaitAssert {
          monitorA ! HeartbeatTick
          expectMsg(Heartbeat)
          // but no HeartbeatRsp
          monitorA ! ReapUnreachableTick
          p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address))
          q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, Some(remoteAddressUid)))
        }
      }

      // make sure nothing floods over to next test
      expectNoMsg(2 seconds)
    }

  }

}

Other Akka source code examples

Here is a short list of links related to this Akka RemoteWatcherSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.