Akka/Scala example source code file (RemoteDeathWatchSpec.scala)

This example Akka source code file (RemoteDeathWatchSpec.scala) is included in my "Source Code Warehouse" project.

The RemoteDeathWatchSpec.scala Akka example source code

 * Copyright (C) 2009-2014 Typesafe Inc. <>
package akka.remote

import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.TestUtils
import akka.event.Logging.Warning

class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString("""
akka {
    actor {
        provider = "akka.remote.RemoteActorRefProvider"
        deployment {
            /watchers.remote = "akka.tcp://other@localhost:2666"
    remote.retry-gate-closed-for = 1 s
    remote.initial-system-message-delivery-timeout = 3 s
    remote.netty.tcp {
        hostname = "localhost"
        port = 0
""")) with ImplicitSender with DefaultTimeout with DeathWatchSpec {

  val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.netty.tcp.port=2666")

  override def beforeTermination() {
      EventFilter.warning(pattern = "received dead letter.*Disassociate")))

  override def afterTermination() {

  override def expectedTestDuration: FiniteDuration = 120.seconds

  "receive Terminated when system of de-serialized ActorRef is not running" in {
    val probe = TestProbe()
    system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent])
    val rarp = RARP(system).provider
    // pick an unused port
    val port = TestUtils.temporaryServerAddress().getPort
    // simulate de-serialized ActorRef
    val ref = rarp.resolveActorRef(s"akka.tcp://OtherSystem@localhost:$port/user/foo/bar#1752527294")
    system.actorOf(Props(new Actor {
      def receive = {
        case Terminated(r) ⇒ testActor ! r

    expectMsg(20.seconds, ref)
    // we don't expect real quarantine when the UID is unknown, i.e. QuarantinedEvent is not published 
    // The following verifies ticket #3870, i.e. make sure that re-delivery of Watch message is stopped.
    // It was observed as periodic logging of "address is now gated" when the gate was lifted.
    system.eventStream.subscribe(probe.ref, classOf[Warning])
    probe.expectNoMsg(rarp.remoteSettings.RetryGateClosedFor * 2)

  "receive Terminated when watched node is unknown host" in {
    val path = RootActorPath(Address("akka.tcp",, "unknownhost", 2552)) / "user" / "subject"
    system.actorOf(Props(new Actor {
      def receive = {
        case t: Terminated ⇒ testActor !
    }).withDeploy(Deploy.local), name = "observer2")

    expectMsg(60.seconds, path)

  "receive ActorIdentity(None) when identified node is unknown host" in {
    val path = RootActorPath(Address("akka.tcp",, "unknownhost2", 2552)) / "user" / "subject"
    system.actorSelection(path) ! Identify(path)
    expectMsg(60.seconds, ActorIdentity(path, None))

  "quarantine systems after unsuccessful system message delivery if have not communicated before" in {
    // Synthesize an ActorRef to a remote system this one has never talked to before.
    // This forces ReliableDeliverySupervisor to start with unknown remote system UID.
    val extinctPath = RootActorPath(Address("akka.tcp", "extinct-system", "localhost", TestUtils.temporaryServerAddress().getPort)) / "user" / "noone"
    val transport = RARP(system).provider.transport
    val extinctRef = new RemoteActorRef(transport, transport.localAddressForRemote(extinctPath.address),
      extinctPath, Nobody, props = None, deploy = None)

    val probe = TestProbe()

    system.eventStream.subscribe(probe.ref, classOf[Warning])
    probe.expectNoMsg(RARP(system).provider.remoteSettings.RetryGateClosedFor * 2)


