|
Akka/Scala example source code file (RemoteDeathWatchSpec.scala)
The RemoteDeathWatchSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.remote import akka.testkit._ import akka.actor._ import com.typesafe.config.ConfigFactory import akka.actor.RootActorPath import scala.concurrent.duration._ import akka.TestUtils import akka.event.Logging.Warning @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString(""" akka { actor { provider = "akka.remote.RemoteActorRefProvider" deployment { /watchers.remote = "akka.tcp://other@localhost:2666" } } remote.retry-gate-closed-for = 1 s remote.initial-system-message-delivery-timeout = 3 s remote.netty.tcp { hostname = "localhost" port = 0 } } """)) with ImplicitSender with DefaultTimeout with DeathWatchSpec { val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.netty.tcp.port=2666") .withFallback(system.settings.config)) override def beforeTermination() { system.eventStream.publish(TestEvent.Mute( EventFilter.warning(pattern = "received dead letter.*Disassociate"))) } override def afterTermination() { shutdown(other) } override def expectedTestDuration: FiniteDuration = 120.seconds "receive Terminated when system of de-serialized ActorRef is not running" in { val probe = TestProbe() system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent]) val rarp = RARP(system).provider // pick an unused port val port = TestUtils.temporaryServerAddress().getPort // simulate de-serialized ActorRef val ref = rarp.resolveActorRef(s"akka.tcp://OtherSystem@localhost:$port/user/foo/bar#1752527294") system.actorOf(Props(new Actor { context.watch(ref) def receive = { case Terminated(r) ⇒ testActor ! r } }).withDeploy(Deploy.local)) expectMsg(20.seconds, ref) // we don't expect real quarantine when the UID is unknown, i.e. QuarantinedEvent is not published probe.expectNoMsg(3.seconds) // The following verifies ticket #3870, i.e. make sure that re-delivery of Watch message is stopped. // It was observed as periodic logging of "address is now gated" when the gate was lifted. system.eventStream.subscribe(probe.ref, classOf[Warning]) probe.expectNoMsg(rarp.remoteSettings.RetryGateClosedFor * 2) } "receive Terminated when watched node is unknown host" in { val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject" system.actorOf(Props(new Actor { context.watch(context.actorFor(path)) def receive = { case t: Terminated ⇒ testActor ! t.actor.path } }).withDeploy(Deploy.local), name = "observer2") expectMsg(60.seconds, path) } "receive ActorIdentity(None) when identified node is unknown host" in { val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost2", 2552)) / "user" / "subject" system.actorSelection(path) ! Identify(path) expectMsg(60.seconds, ActorIdentity(path, None)) } "quarantine systems after unsuccessful system message delivery if have not communicated before" in { // Synthesize an ActorRef to a remote system this one has never talked to before. // This forces ReliableDeliverySupervisor to start with unknown remote system UID. val extinctPath = RootActorPath(Address("akka.tcp", "extinct-system", "localhost", TestUtils.temporaryServerAddress().getPort)) / "user" / "noone" val transport = RARP(system).provider.transport val extinctRef = new RemoteActorRef(transport, transport.localAddressForRemote(extinctPath.address), extinctPath, Nobody, props = None, deploy = None) val probe = TestProbe() probe.watch(extinctRef) probe.unwatch(extinctRef) probe.expectNoMsg(5.seconds) system.eventStream.subscribe(probe.ref, classOf[Warning]) probe.expectNoMsg(RARP(system).provider.remoteSettings.RetryGateClosedFor * 2) } } Other Akka source code examplesHere is a short list of links related to this Akka RemoteDeathWatchSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.