|
Akka/Scala example source code file (ActorSystemSpec.scala)
The ActorSystemSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.actor import language.postfixOps import akka.testkit._ import org.scalatest.junit.JUnitSuiteLike import com.typesafe.config.ConfigFactory import scala.concurrent.{ ExecutionContext, Await, Future } import scala.concurrent.duration._ import java.util.concurrent.{ RejectedExecutionException, ConcurrentLinkedQueue } import akka.util.Timeout import akka.japi.Util.immutableSeq import akka.pattern.ask import akka.dispatch._ import com.typesafe.config.Config import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue, TimeUnit } import akka.util.Switch import akka.util.Helpers.ConfigOps class JavaExtensionSpec extends JavaExtension with JUnitSuiteLike object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider { def lookup = this def createExtension(s: ExtendedActorSystem) = new TestExtension(s) } // Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains class TestExtension(val system: ExtendedActorSystem) extends Extension object ActorSystemSpec { class Waves extends Actor { var master: ActorRef = _ var terminaters = Set[ActorRef]() def receive = { case n: Int ⇒ master = sender() terminaters = Set() ++ (for (i ← 1 to n) yield { val man = context.watch(context.system.actorOf(Props[Terminater])) man ! "run" man }) case Terminated(child) if terminaters contains child ⇒ terminaters -= child if (terminaters.isEmpty) { master ! "done" context stop self } } override def preRestart(cause: Throwable, msg: Option[Any]) { if (master ne null) { master ! "failed with " + cause + " while processing " + msg } context stop self } } class Terminater extends Actor { def receive = { case "run" ⇒ context.stop(self) } } class Strategy extends SupervisorStrategyConfigurator { def create() = OneForOneStrategy() { case _ ⇒ SupervisorStrategy.Escalate } } final case class FastActor(latch: TestLatch, testActor: ActorRef) extends Actor { val ref1 = context.actorOf(Props.empty) val ref2 = context.actorFor(ref1.path.toString) testActor ! ref2.getClass latch.countDown() def receive = { case _ ⇒ } } class SlowDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(_config, _prerequisites) { private val instance = new Dispatcher( this, config.getString("id"), config.getInt("throughput"), config.getNanosDuration("throughput-deadline-time"), configureExecutor(), config.getMillisDuration("shutdown-timeout")) { val doneIt = new Switch override protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint) doneIt.switchOn { TestKit.awaitCond(mbox.actor.actor != null, 1.second) mbox.actor.actor match { case FastActor(latch, _) ⇒ Await.ready(latch, 1.second) } } ret } } /** * Returns the same dispatcher instance for each invocation */ override def dispatcher(): MessageDispatcher = instance } class TestExecutionContext(testActor: ActorRef, underlying: ExecutionContext) extends ExecutionContext { def execute(runnable: Runnable): Unit = { testActor ! "called" underlying.execute(runnable) } def reportFailure(t: Throwable): Unit = { testActor ! "failed" underlying.reportFailure(t) } } val config = s""" akka.extensions = ["akka.actor.TestExtension"] slow { type="${classOf[SlowDispatcher].getName}" }""" } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSender { import ActorSystemSpec.FastActor "An ActorSystem" must { "use scala.concurrent.Future's InternalCallbackEC" in { system.asInstanceOf[ActorSystemImpl].internalCallingThreadExecutionContext.getClass.getName should be("scala.concurrent.Future$InternalCallbackExecutor$") } "reject invalid names" in { for ( n ← Seq( "hallo_welt", "-hallowelt", "hallo*welt", "hallo@welt", "hallo#welt", "hallo$welt", "hallo%welt", "hallo/welt") ) intercept[IllegalArgumentException] { ActorSystem(n) } } "allow valid names" in { shutdown(ActorSystem("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-")) } "support extensions" in { // TestExtension is configured and should be loaded at startup system.hasExtension(TestExtension) should be(true) TestExtension(system).system should be(system) system.extension(TestExtension).system should be(system) } "log dead letters" in { val sys = ActorSystem("LogDeadLetters", ConfigFactory.parseString("akka.loglevel=INFO").withFallback(AkkaSpec.testConf)) try { val a = sys.actorOf(Props[ActorSystemSpec.Terminater]) watch(a) a ! "run" expectTerminated(a) EventFilter.info(pattern = "not delivered", occurrences = 1).intercept { a ! "boom" }(sys) } finally shutdown(sys) } "run termination callbacks in order" in { val system2 = ActorSystem("TerminationCallbacks", AkkaSpec.testConf) val result = new ConcurrentLinkedQueue[Int] val count = 10 val latch = TestLatch(count) for (i ← 1 to count) { system2.registerOnTermination { Thread.sleep((i % 3).millis.dilated.toMillis) result add i latch.countDown() } } system2.shutdown() Await.ready(latch, 5 seconds) val expected = (for (i ← 1 to count) yield i).reverse immutableSeq(result) should be(expected) } "awaitTermination after termination callbacks" in { val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf) @volatile var callbackWasRun = false system2.registerOnTermination { Thread.sleep(50.millis.dilated.toMillis) callbackWasRun = true } import system.dispatcher system2.scheduler.scheduleOnce(200.millis.dilated) { system2.shutdown() } system2.awaitTermination(5 seconds) callbackWasRun should be(true) } "return isTerminated status correctly" in { val system = ActorSystem() system.isTerminated should be(false) system.shutdown() system.awaitTermination(10 seconds) system.isTerminated should be(true) } "throw RejectedExecutionException when shutdown" in { val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf) system2.shutdown() system2.awaitTermination(10 seconds) intercept[RejectedExecutionException] { system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") } }.getMessage should be("Must be called prior to system shutdown.") } "reliably create waves of actors" in { import system.dispatcher implicit val timeout = Timeout((20 seconds).dilated) val waves = for (i ← 1 to 3) yield system.actorOf(Props[ActorSystemSpec.Waves]) ? 50000 Await.result(Future.sequence(waves), timeout.duration + 5.seconds) should be(Seq("done", "done", "done")) } "find actors that just have been created" in { system.actorOf(Props(new FastActor(TestLatch(), testActor)).withDispatcher("slow")) expectMsgType[Class[_]] should be(classOf[LocalActorRef]) } "reliable deny creation of actors while shutting down" in { val system = ActorSystem() import system.dispatcher system.scheduler.scheduleOnce(200 millis) { system.shutdown() } var failing = false var created = Vector.empty[ActorRef] while (!system.isTerminated) { try { val t = system.actorOf(Props[ActorSystemSpec.Terminater]) failing should not be true // because once failing => always failing (it’s due to shutdown) created :+= t } catch { case _: IllegalStateException ⇒ failing = true } if (!failing && system.uptime >= 5) { println(created.last) println(system.asInstanceOf[ExtendedActorSystem].printTree) fail("System didn't terminate within 5 seconds") } } created filter (ref ⇒ !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) should be(Seq()) } "shut down when /user fails" in { implicit val system = ActorSystem("Stop", AkkaSpec.testConf) EventFilter[ActorKilledException]() intercept { system.actorSelection("/user") ! Kill awaitCond(system.isTerminated) } } "allow configuration of guardian supervisor strategy" in { implicit val system = ActorSystem("Stop", ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=akka.actor.StoppingSupervisorStrategy") .withFallback(AkkaSpec.testConf)) val a = system.actorOf(Props(new Actor { def receive = { case "die" ⇒ throw new Exception("hello") } })) val probe = TestProbe() probe.watch(a) EventFilter[Exception]("hello", occurrences = 1) intercept { a ! "die" } val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false)) t.existenceConfirmed should be(true) t.addressTerminated should be(false) shutdown(system) } "shut down when /user escalates" in { implicit val system = ActorSystem("Stop", ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=\"akka.actor.ActorSystemSpec$Strategy\"") .withFallback(AkkaSpec.testConf)) val a = system.actorOf(Props(new Actor { def receive = { case "die" ⇒ throw new Exception("hello") } })) EventFilter[Exception]("hello") intercept { a ! "die" awaitCond(system.isTerminated) } } "work with a passed in ExecutionContext" in { val ecProbe = TestProbe() val ec = new ActorSystemSpec.TestExecutionContext(ecProbe.ref, ExecutionContexts.global()) val system2 = ActorSystem(name = "default", defaultExecutionContext = Some(ec)) try { val ref = system2.actorOf(Props(new Actor { def receive = { case "ping" ⇒ sender() ! "pong" } })) val probe = TestProbe() ref.tell("ping", probe.ref) ecProbe.expectMsg(1.second, "called") probe.expectMsg(1.second, "pong") } finally { shutdown(system2) } } "not use passed in ExecutionContext if executor is configured" in { val ecProbe = TestProbe() val ec = new ActorSystemSpec.TestExecutionContext(ecProbe.ref, ExecutionContexts.global()) val config = ConfigFactory.parseString("akka.actor.default-dispatcher.executor = \"fork-join-executor\"") val system2 = ActorSystem(name = "default", config = Some(config), defaultExecutionContext = Some(ec)) try { val ref = system2.actorOf(Props(new Actor { def receive = { case "ping" ⇒ sender() ! "pong" } })) val probe = TestProbe() ref.tell("ping", probe.ref) ecProbe.expectNoMsg() probe.expectMsg(1.second, "pong") } finally { shutdown(system2) } } } } Other Akka source code examplesHere is a short list of links related to this Akka ActorSystemSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.