|
Akka/Scala example source code file (ExecutionContextSpec.scala)
The ExecutionContextSpec.scala Akka example source codepackage akka.dispatch import java.util.concurrent.{ ExecutorService, Executor, Executors } import java.util.concurrent.atomic.AtomicInteger import scala.concurrent._ import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout } import akka.util.SerializedSuspendableExecutionContext @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { "An ExecutionContext" must { "be instantiable" in { val es = Executors.newCachedThreadPool() try { val executor: Executor with ExecutionContext = ExecutionContext.fromExecutor(es) executor should not be (null) val executorService: ExecutorService with ExecutionContext = ExecutionContext.fromExecutorService(es) executorService should not be (null) val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es) jExecutor should not be (null) val jExecutorService: ExecutionContextExecutorService = ExecutionContexts.fromExecutorService(es) jExecutorService should not be (null) } finally { es.shutdown } } "be able to use Batching" in { system.dispatcher.isInstanceOf[BatchingExecutor] should be(true) import system.dispatcher def batchable[T](f: ⇒ T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable { override def isBatchable = true override def run: Unit = f }) val p = Promise[Unit]() batchable { val lock, callingThreadLock, count = new AtomicInteger(0) callingThreadLock.compareAndSet(0, 1) // Enable the lock (1 to 100) foreach { i ⇒ batchable { if (callingThreadLock.get != 0) p.tryFailure(new IllegalStateException("Batch was executed inline!")) else if (count.incrementAndGet == 100) p.trySuccess(()) //Done else if (lock.compareAndSet(0, 1)) { try Thread.sleep(10) finally lock.compareAndSet(1, 0) } else p.tryFailure(new IllegalStateException("Executed batch in parallel!")) } } callingThreadLock.compareAndSet(1, 0) // Disable the lock } Await.result(p.future, timeout.duration) should be(()) } "be able to avoid starvation when Batching is used and Await/blocking is called" in { system.dispatcher.isInstanceOf[BatchingExecutor] should be(true) import system.dispatcher def batchable[T](f: ⇒ T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable { override def isBatchable = true override def run: Unit = f }) val latch = TestLatch(101) batchable { (1 to 100) foreach { i ⇒ batchable { val deadlock = TestLatch(1) batchable { deadlock.open() } Await.ready(deadlock, timeout.duration) latch.countDown() } } latch.countDown() } Await.ready(latch, timeout.duration) } } "A SerializedSuspendableExecutionContext" must { "be suspendable and resumable" in { val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global) val counter = new AtomicInteger(0) def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } perform(_ + 1) perform(x ⇒ { sec.suspend(); x * 2 }) awaitCond(counter.get == 2) perform(_ + 4) perform(_ * 2) sec.size should be(2) Thread.sleep(500) sec.size should be(2) counter.get should be(2) sec.resume() awaitCond(counter.get == 12) perform(_ * 2) awaitCond(counter.get == 24) sec.isEmpty should be(true) } "execute 'throughput' number of tasks per sweep" in { val submissions = new AtomicInteger(0) val counter = new AtomicInteger(0) val underlying = new ExecutionContext { override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) } override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) } } val throughput = 25 val sec = SerializedSuspendableExecutionContext(throughput)(underlying) sec.suspend() def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } val total = 1000 1 to total foreach { _ ⇒ perform(_ + 1) } sec.size() should be(total) sec.resume() awaitCond(counter.get == total) submissions.get should be(total / throughput) sec.isEmpty should be(true) } "execute tasks in serial" in { val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global) val total = 10000 val counter = new AtomicInteger(0) def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } 1 to total foreach { i ⇒ perform(c ⇒ if (c == (i - 1)) c + 1 else c) } awaitCond(counter.get == total) sec.isEmpty should be(true) } "relinquish thread when suspended" in { val submissions = new AtomicInteger(0) val counter = new AtomicInteger(0) val underlying = new ExecutionContext { override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) } override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) } } val throughput = 25 val sec = SerializedSuspendableExecutionContext(throughput)(underlying) sec.suspend() def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } perform(_ + 1) 1 to 10 foreach { _ ⇒ perform(identity) } perform(x ⇒ { sec.suspend(); x * 2 }) perform(_ + 8) sec.size should be(13) sec.resume() awaitCond(counter.get == 2) sec.resume() awaitCond(counter.get == 10) sec.isEmpty should be(true) submissions.get should be(2) } } } Other Akka source code examplesHere is a short list of links related to this Akka ExecutionContextSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.