|
Akka/Scala example source code file (ActorModelSpec.scala)
The ActorModelSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.dispatch
import language.postfixOps
import java.rmi.RemoteException
import java.util.concurrent.{ TimeUnit, CountDownLatch, ConcurrentHashMap }
import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger }
import org.junit.runner.RunWith
import org.scalatest.Assertions._
import org.scalatest.junit.JUnitRunner
import com.typesafe.config.Config
import akka.actor._
import akka.dispatch.sysmsg._
import akka.dispatch._
import akka.event.Logging.Error
import akka.pattern.ask
import akka.testkit._
import akka.util.Helpers.ConfigOps
import akka.util.Switch
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }
import scala.annotation.tailrec
object ActorModelSpec {
sealed trait ActorModelMessage extends NoSerializationVerificationNeeded
final case class TryReply(expect: Any) extends ActorModelMessage
final case class Reply(expect: Any) extends ActorModelMessage
final case class Forward(to: ActorRef, msg: Any) extends ActorModelMessage
final case class CountDown(latch: CountDownLatch) extends ActorModelMessage
final case class Increment(counter: AtomicLong) extends ActorModelMessage
final case class AwaitLatch(latch: CountDownLatch) extends ActorModelMessage
final case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
final case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage
final case class Wait(time: Long) extends ActorModelMessage
final case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage
case object Interrupt extends ActorModelMessage
final case class InterruptNicely(expect: Any) extends ActorModelMessage
case object Restart extends ActorModelMessage
case object DoubleStop extends ActorModelMessage
final case class ThrowException(e: Throwable) extends ActorModelMessage
val Ping = "Ping"
val Pong = "Pong"
class DispatcherActor extends Actor {
private val busy = new Switch(false)
def interceptor = context.dispatcher.asInstanceOf[MessageDispatcherInterceptor]
def ack(): Unit = {
if (!busy.switchOn(())) {
throw new Exception("isolation violated")
} else {
interceptor.getStats(self).msgsProcessed.incrementAndGet()
}
}
override def postRestart(reason: Throwable) {
interceptor.getStats(self).restarts.incrementAndGet()
}
def receive = {
case AwaitLatch(latch) ⇒ { ack(); latch.await(); busy.switchOff(()) }
case Meet(sign, wait) ⇒ { ack(); sign.countDown(); wait.await(); busy.switchOff(()) }
case Wait(time) ⇒ { ack(); Thread.sleep(time); busy.switchOff(()) }
case WaitAck(time, l) ⇒ { ack(); Thread.sleep(time); l.countDown(); busy.switchOff(()) }
case Reply(msg) ⇒ { ack(); sender() ! msg; busy.switchOff(()) }
case TryReply(msg) ⇒ { ack(); sender().tell(msg, null); busy.switchOff(()) }
case Forward(to, msg) ⇒ { ack(); to.forward(msg); busy.switchOff(()) }
case CountDown(latch) ⇒ { ack(); latch.countDown(); busy.switchOff(()) }
case Increment(count) ⇒ { ack(); count.incrementAndGet(); busy.switchOff(()) }
case CountDownNStop(l) ⇒ { ack(); l.countDown(); context.stop(self); busy.switchOff(()) }
case Restart ⇒ { ack(); busy.switchOff(()); throw new Exception("Restart requested") }
case Interrupt ⇒ { ack(); sender() ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(()); throw new InterruptedException("Ping!") }
case InterruptNicely(msg) ⇒ { ack(); sender() ! msg; busy.switchOff(()); Thread.currentThread().interrupt() }
case ThrowException(e: Throwable) ⇒ { ack(); busy.switchOff(()); throw e }
case DoubleStop ⇒ { ack(); context.stop(self); context.stop(self); busy.switchOff }
}
}
class InterceptorStats {
val suspensions = new AtomicLong(0)
val resumes = new AtomicLong(0)
val registers = new AtomicLong(0)
val unregisters = new AtomicLong(0)
val msgsReceived = new AtomicLong(0)
val msgsProcessed = new AtomicLong(0)
val restarts = new AtomicLong(0)
override def toString = "InterceptorStats(susp=" + suspensions +
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts
}
trait MessageDispatcherInterceptor extends MessageDispatcher {
val stats = new ConcurrentHashMap[ActorRef, InterceptorStats]
val stops = new AtomicLong(0)
def getStats(actorRef: ActorRef) = {
val is = new InterceptorStats
stats.putIfAbsent(actorRef, is) match {
case null ⇒ is
case other ⇒ other
}
}
protected[akka] abstract override def suspend(actor: ActorCell) {
getStats(actor.self).suspensions.incrementAndGet()
super.suspend(actor)
}
protected[akka] abstract override def resume(actor: ActorCell) {
super.resume(actor)
getStats(actor.self).resumes.incrementAndGet()
}
protected[akka] abstract override def register(actor: ActorCell) {
assert(getStats(actor.self).registers.incrementAndGet() == 1)
super.register(actor)
}
protected[akka] abstract override def unregister(actor: ActorCell) {
assert(getStats(actor.self).unregisters.incrementAndGet() == 1)
super.unregister(actor)
}
protected[akka] abstract override def dispatch(receiver: ActorCell, invocation: Envelope) {
val stats = getStats(receiver.self)
stats.msgsReceived.incrementAndGet()
super.dispatch(receiver, invocation)
}
protected[akka] abstract override def shutdown() {
stops.incrementAndGet()
super.shutdown()
}
}
def assertDispatcher(dispatcher: MessageDispatcherInterceptor)(
stops: Long = dispatcher.stops.get())(implicit system: ActorSystem) {
val deadline = System.currentTimeMillis + dispatcher.shutdownTimeout.toMillis * 5
try {
await(deadline)(stops == dispatcher.stops.get)
} catch {
case e: Throwable ⇒
system.eventStream.publish(Error(e, dispatcher.toString, dispatcher.getClass, "actual: stops=" + dispatcher.stops.get +
" required: stops=" + stops))
throw e
}
}
def assertCountDown(latch: CountDownLatch, wait: Long, hint: String) {
if (!latch.await(wait, TimeUnit.MILLISECONDS))
fail("Failed to count down within " + wait + " millis (count at " + latch.getCount + "). " + hint)
}
def assertNoCountDown(latch: CountDownLatch, wait: Long, hint: String) {
if (latch.await(wait, TimeUnit.MILLISECONDS))
fail("Expected count down to fail after " + wait + " millis. " + hint)
}
def statsFor(actorRef: ActorRef, dispatcher: MessageDispatcher = null) =
dispatcher.asInstanceOf[MessageDispatcherInterceptor].getStats(actorRef)
def assertRefDefaultZero(actorRef: ActorRef, dispatcher: MessageDispatcher = null)(
suspensions: Long = 0,
resumes: Long = 0,
registers: Long = 0,
unregisters: Long = 0,
msgsReceived: Long = 0,
msgsProcessed: Long = 0,
restarts: Long = 0)(implicit system: ActorSystem) {
assertRef(actorRef, dispatcher)(
suspensions,
resumes,
registers,
unregisters,
msgsReceived,
msgsProcessed,
restarts)
}
def assertRef(actorRef: ActorRef, dispatcher: MessageDispatcher = null)(
suspensions: Long = statsFor(actorRef, dispatcher).suspensions.get(),
resumes: Long = statsFor(actorRef, dispatcher).resumes.get(),
registers: Long = statsFor(actorRef, dispatcher).registers.get(),
unregisters: Long = statsFor(actorRef, dispatcher).unregisters.get(),
msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(),
restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) {
val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].dispatcher))
val deadline = System.currentTimeMillis + 1000
try {
await(deadline)(stats.suspensions.get() == suspensions)
await(deadline)(stats.resumes.get() == resumes)
await(deadline)(stats.registers.get() == registers)
await(deadline)(stats.unregisters.get() == unregisters)
await(deadline)(stats.msgsReceived.get() == msgsReceived)
await(deadline)(stats.msgsProcessed.get() == msgsProcessed)
await(deadline)(stats.restarts.get() == restarts)
} catch {
case e: Throwable ⇒
system.eventStream.publish(Error(e,
Option(dispatcher).toString,
(Option(dispatcher) getOrElse this).getClass,
"actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
throw e
}
}
@tailrec def await(until: Long)(condition: ⇒ Boolean): Unit =
if (System.currentTimeMillis() <= until) {
var done = false
try {
done = condition
if (!done) Thread.sleep(25)
} catch {
case e: InterruptedException ⇒
}
if (!done) await(until)(condition)
} else throw new AssertionError("await failed")
}
abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with DefaultTimeout {
import ActorModelSpec._
def newTestActor(dispatcher: String) = system.actorOf(Props[DispatcherActor].withDispatcher(dispatcher))
def awaitStarted(ref: ActorRef): Unit = {
awaitCond(ref match {
case r: RepointableRef ⇒ r.isStarted
case _ ⇒ true
}, 1 second, 10 millis)
}
protected def interceptedDispatcher(): MessageDispatcherInterceptor
protected def dispatcherType: String
"A " + dispatcherType must {
"dynamically handle its own life cycle" in {
implicit val dispatcher = interceptedDispatcher()
assertDispatcher(dispatcher)(stops = 0)
val a = newTestActor(dispatcher.id)
assertDispatcher(dispatcher)(stops = 0)
system.stop(a)
assertDispatcher(dispatcher)(stops = 1)
assertRef(a, dispatcher)(
suspensions = 0,
resumes = 0,
registers = 1,
unregisters = 1,
msgsReceived = 0,
msgsProcessed = 0,
restarts = 0)
val futures = for (i ← 1 to 10) yield Future {
i
}
assertDispatcher(dispatcher)(stops = 2)
val a2 = newTestActor(dispatcher.id)
val futures2 = for (i ← 1 to 10) yield Future { i }
assertDispatcher(dispatcher)(stops = 2)
system.stop(a2)
assertDispatcher(dispatcher)(stops = 3)
}
"process messages one at a time" in {
implicit val dispatcher = interceptedDispatcher()
val start, oneAtATime = new CountDownLatch(1)
val a = newTestActor(dispatcher.id)
awaitStarted(a)
a ! CountDown(start)
assertCountDown(start, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
a ! Wait(1000)
a ! CountDown(oneAtATime)
// in case of serialization violation, restart would happen instead of count down
assertCountDown(oneAtATime, (1.5 seconds).dilated.toMillis, "Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
system.stop(a)
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3)
}
"handle queueing from multiple threads" in {
implicit val dispatcher = interceptedDispatcher()
val counter = new CountDownLatch(200)
val a = newTestActor(dispatcher.id)
for (i ← 1 to 10) {
spawn {
for (i ← 1 to 20) {
a ! WaitAck(1, counter)
}
}
}
assertCountDown(counter, 3.seconds.dilated.toMillis, "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
system.stop(a)
}
def spawn(f: ⇒ Unit) {
(new Thread {
override def run(): Unit =
try f catch {
case e: Throwable ⇒ system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread"))
}
}).start()
}
"not process messages for a suspended actor" in {
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id).asInstanceOf[InternalActorRef]
awaitStarted(a)
val done = new CountDownLatch(1)
a.suspend
a ! CountDown(done)
assertNoCountDown(done, 1000, "Should not process messages while suspended")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
a.resume(causedByFailure = null)
assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)
system.stop(a)
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)
}
"handle waves of actors" in {
val dispatcher = interceptedDispatcher()
val props = Props[DispatcherActor].withDispatcher(dispatcher.id)
def flood(num: Int) {
val cachedMessage = CountDownNStop(new CountDownLatch(num))
val stopLatch = new CountDownLatch(num)
val keepAliveLatch = new CountDownLatch(1)
val waitTime = (20 seconds).dilated.toMillis
val boss = system.actorOf(Props(new Actor {
def receive = {
case "run" ⇒ for (_ ← 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage
case Terminated(child) ⇒ stopLatch.countDown()
}
}).withDispatcher("boss"))
try {
// this future is meant to keep the dispatcher alive until the end of the test run even if
// the boss doesn't create children fast enough to keep the dispatcher from becoming empty
// and it needs to be on a separate thread to not deadlock the calling thread dispatcher
new Thread(new Runnable {
def run() = Future {
keepAliveLatch.await(waitTime, TimeUnit.MILLISECONDS)
}(dispatcher)
}).start()
boss ! "run"
try {
assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num)
} catch {
case e: Throwable ⇒
dispatcher match {
case dispatcher: BalancingDispatcher ⇒
val team = dispatcher.team
val mq = dispatcher.messageQueue
System.err.println("Teammates left: " + team.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants)
team.toArray sorted new Ordering[AnyRef] {
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path }
} foreach {
case cell: ActorCell ⇒
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " "
+ cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size)
}
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages)
Iterator.continually(mq.dequeue) takeWhile (_ ne null) foreach System.err.println
case _ ⇒
}
throw e
}
assertCountDown(stopLatch, waitTime, "Expected all children to stop")
} finally {
keepAliveLatch.countDown()
system.stop(boss)
}
}
for (run ← 1 to 3) {
flood(50000)
assertDispatcher(dispatcher)(stops = run)
}
}
"continue to process messages when a thread gets interrupted and throws an exception" in {
filterEvents(
EventFilter[InterruptedException](),
EventFilter[ActorInterruptedException](),
EventFilter[akka.event.Logging.LoggerException]()) {
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id)
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? Interrupt
Thread.interrupted() // CallingThreadDispatcher may necessitate this
val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt
Thread.interrupted() // CallingThreadDispatcher may necessitate this
val f6 = a ? Reply("bar2")
val c = system.scheduler.scheduleOnce(2.seconds) {
import collection.JavaConverters._
Thread.getAllStackTraces().asScala foreach {
case (thread, stack) ⇒
println(s"$thread:")
stack foreach (s ⇒ println(s"\t$s"))
}
}
assert(Await.result(f1, timeout.duration) === "foo")
assert(Await.result(f2, timeout.duration) === "bar")
assert(Await.result(f4, timeout.duration) === "foo2")
assert(intercept[ActorInterruptedException](Await.result(f3, timeout.duration)).getCause.getMessage === "Ping!")
assert(Await.result(f6, timeout.duration) === "bar2")
assert(intercept[ActorInterruptedException](Await.result(f5, timeout.duration)).getCause.getMessage === "Ping!")
c.cancel()
Thread.sleep(300) // give the EventFilters a chance of catching all messages
}
}
"continue to process messages without failure when a thread gets interrupted and doesn't throw an exception" in {
filterEvents(EventFilter[InterruptedException]()) {
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id)
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? InterruptNicely("baz")
val f4 = a ? Reply("foo2")
val f5 = a ? InterruptNicely("baz2")
val f6 = a ? Reply("bar2")
assert(Await.result(f1, timeout.duration) === "foo")
assert(Await.result(f2, timeout.duration) === "bar")
assert(Await.result(f3, timeout.duration) === "baz")
assert(Await.result(f4, timeout.duration) === "foo2")
assert(Await.result(f5, timeout.duration) === "baz2")
assert(Await.result(f6, timeout.duration) === "bar2")
// clear the interrupted flag (only needed for the CallingThreadDispatcher) so the next test can continue normally
Thread.interrupted()
}
}
"continue to process messages when exception is thrown" in {
filterEvents(EventFilter[IndexOutOfBoundsException](), EventFilter[RemoteException]()) {
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id)
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
val f4 = a ? Reply("foo2")
val f5 = a ? ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
assert(Await.result(f1, timeout.duration) === "foo")
assert(Await.result(f2, timeout.duration) === "bar")
assert(Await.result(f4, timeout.duration) === "foo2")
assert(Await.result(f6, timeout.duration) === "bar2")
assert(f3.value.isEmpty)
assert(f5.value.isEmpty)
}
}
"not double-deregister" in {
implicit val dispatcher = interceptedDispatcher()
for (i ← 1 to 1000) system.actorOf(Props.empty)
val a = newTestActor(dispatcher.id)
a ! DoubleStop
awaitCond(statsFor(a, dispatcher).registers.get == 1)
awaitCond(statsFor(a, dispatcher).unregisters.get == 1)
}
}
}
object DispatcherModelSpec {
import ActorModelSpec._
val config = {
"""
boss {
executor = thread-pool-executor
type = PinnedDispatcher
}
""" +
// use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state
(for (n ← 1 to 30) yield """
test-dispatcher-%s {
type = "akka.actor.dispatch.DispatcherModelSpec$MessageDispatcherInterceptorConfigurator"
}""".format(n)).mkString
}
class MessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
import akka.util.Helpers.ConfigOps
private val instance: MessageDispatcher =
new Dispatcher(this,
config.getString("id"),
config.getInt("throughput"),
config.getNanosDuration("throughput-deadline-time"),
configureExecutor(),
config.getMillisDuration("shutdown-timeout")) with MessageDispatcherInterceptor
override def dispatcher(): MessageDispatcher = instance
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DispatcherModelSpec extends ActorModelSpec(DispatcherModelSpec.config) {
import ActorModelSpec._
val dispatcherCount = new AtomicInteger()
override def interceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each test, since the MessageDispatcherInterceptor holds state
system.dispatchers.lookup("test-dispatcher-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor]
}
override def dispatcherType = "Dispatcher"
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = interceptedDispatcher()
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher.id)
a ! Meet(aStart, aStop)
assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel")
aStop.countDown()
system.stop(a)
system.stop(b)
while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
}
}
object BalancingDispatcherModelSpec {
import ActorModelSpec._
// TODO check why throughput=1 here? (came from old test)
val config = {
"""
boss {
executor = thread-pool-executor
type = PinnedDispatcher
}
""" +
// use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state
(for (n ← 1 to 30) yield """
test-balancing-dispatcher-%s {
type = "akka.actor.dispatch.BalancingDispatcherModelSpec$BalancingMessageDispatcherInterceptorConfigurator"
throughput=1
}""".format(n)).mkString
}
class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends BalancingDispatcherConfigurator(config, prerequisites) {
import akka.util.Helpers.ConfigOps
override protected def create(mailboxType: MailboxType): BalancingDispatcher =
new BalancingDispatcher(this,
config.getString("id"),
config.getInt("throughput"),
config.getNanosDuration("throughput-deadline-time"),
mailboxType,
configureExecutor(),
config.getMillisDuration("shutdown-timeout"),
config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class BalancingDispatcherModelSpec extends ActorModelSpec(BalancingDispatcherModelSpec.config) {
import ActorModelSpec._
val dispatcherCount = new AtomicInteger()
override def interceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each test, since the MessageDispatcherInterceptor holds state
system.dispatchers.lookup("test-balancing-dispatcher-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor]
}
override def dispatcherType = "Balancing Dispatcher"
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = interceptedDispatcher()
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher.id)
a ! Meet(aStart, aStop)
assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel")
aStop.countDown()
system.stop(a)
system.stop(b)
while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ActorModelSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.