alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Scheduler.scala)

This example Akka source code file (Scheduler.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, boolean, cancellable, collection, concurrent, dispatch, executioncontext, finiteduration, lars, runnable, schedulerexception, time, timertask, unit, utilities, wheelsize

The Scheduler.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.actor

import java.io.Closeable
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.duration._
import scala.util.control.{ NoStackTrace, NonFatal }
import com.typesafe.config.Config
import akka.event.LoggingAdapter
import akka.util.Helpers
import akka.util.Unsafe.{ instance ⇒ unsafe }
import akka.dispatch.AbstractNodeQueue

/**
 * This exception is thrown by Scheduler.schedule* when scheduling is not
 * possible, e.g. after shutting down the Scheduler.
 */
private final case class SchedulerException(msg: String) extends akka.AkkaException(msg) with NoStackTrace

// The Scheduler trait is included in the documentation. KEEP THE LINES SHORT!!!
//#scheduler
/**
 * An Akka scheduler service. This one needs one special behavior: if
 * Closeable, it MUST execute all outstanding tasks upon .close() in order
 * to properly shutdown all dispatchers.
 *
 * Furthermore, this timer service MUST throw IllegalStateException if it
 * cannot schedule a task. Once scheduled, the task MUST be executed. If
 * executed upon close(), the task may execute before its timeout.
 *
 * Scheduler implementation are loaded reflectively at ActorSystem start-up
 * with the following constructor arguments:
 *  1) the system’s com.typesafe.config.Config (from system.settings.config)
 *  2) a akka.event.LoggingAdapter
 *  3) a java.util.concurrent.ThreadFactory
 */
trait Scheduler {
  /**
   * Schedules a message to be sent repeatedly with an initial delay and
   * frequency. E.g. if you would like a message to be sent immediately and
   * thereafter every 500ms you would set delay=Duration.Zero and
   * interval=Duration(500, TimeUnit.MILLISECONDS)
   *
   * Java & Scala API
   */
  final def schedule(
    initialDelay: FiniteDuration,
    interval: FiniteDuration,
    receiver: ActorRef,
    message: Any)(implicit executor: ExecutionContext,
                  sender: ActorRef = Actor.noSender): Cancellable =
    schedule(initialDelay, interval, new Runnable {
      def run = {
        receiver ! message
        if (receiver.isTerminated)
          throw new SchedulerException("timer active for terminated actor")
      }
    })

  /**
   * Schedules a function to be run repeatedly with an initial delay and a
   * frequency. E.g. if you would like the function to be run after 2 seconds
   * and thereafter every 100ms you would set delay = Duration(2, TimeUnit.SECONDS)
   * and interval = Duration(100, TimeUnit.MILLISECONDS)
   *
   * Scala API
   */
  final def schedule(
    initialDelay: FiniteDuration,
    interval: FiniteDuration)(f: ⇒ Unit)(
      implicit executor: ExecutionContext): Cancellable =
    schedule(initialDelay, interval, new Runnable { override def run = f })

  /**
   * Schedules a function to be run repeatedly with an initial delay and
   * a frequency. E.g. if you would like the function to be run after 2
   * seconds and thereafter every 100ms you would set delay = Duration(2,
   * TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS)
   *
   * Java API
   */
  def schedule(
    initialDelay: FiniteDuration,
    interval: FiniteDuration,
    runnable: Runnable)(implicit executor: ExecutionContext): Cancellable

  /**
   * Schedules a message to be sent once with a delay, i.e. a time period that has
   * to pass before the message is sent.
   *
   * Java & Scala API
   */
  final def scheduleOnce(
    delay: FiniteDuration,
    receiver: ActorRef,
    message: Any)(implicit executor: ExecutionContext,
                  sender: ActorRef = Actor.noSender): Cancellable =
    scheduleOnce(delay, new Runnable {
      override def run = receiver ! message
    })

  /**
   * Schedules a function to be run once with a delay, i.e. a time period that has
   * to pass before the function is run.
   *
   * Scala API
   */
  final def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)(
    implicit executor: ExecutionContext): Cancellable =
    scheduleOnce(delay, new Runnable { override def run = f })

  /**
   * Schedules a Runnable to be run once with a delay, i.e. a time period that
   * has to pass before the runnable is executed.
   *
   * Java & Scala API
   */
  def scheduleOnce(
    delay: FiniteDuration,
    runnable: Runnable)(implicit executor: ExecutionContext): Cancellable

  /**
   * The maximum supported task frequency of this scheduler, i.e. the inverse
   * of the minimum time interval between executions of a recurring task, in Hz.
   */
  def maxFrequency: Double

}
//#scheduler

// this one is just here so we can present a nice AbstractScheduler for Java
abstract class AbstractSchedulerBase extends Scheduler

//#cancellable
/**
 * Signifies something that can be cancelled
 * There is no strict guarantee that the implementation is thread-safe,
 * but it should be good practice to make it so.
 */
trait Cancellable {
  /**
   * Cancels this Cancellable and returns true if that was successful.
   * If this cancellable was (concurrently) cancelled already, then this method
   * will return false although isCancelled will return true.
   *
   * Java & Scala API
   */
  def cancel(): Boolean

  /**
   * Returns true if and only if this Cancellable has been successfully cancelled
   *
   * Java & Scala API
   */
  def isCancelled: Boolean
}
//#cancellable

/**
 * This scheduler implementation is based on a revolving wheel of buckets,
 * like Netty’s HashedWheelTimer, which it advances at a fixed tick rate and
 * dispatches tasks it finds in the current bucket to their respective
 * ExecutionContexts. The tasks are held in TaskHolders, which upon
 * cancellation null out their reference to the actual task, leaving only this
 * shell to be cleaned up when the wheel reaches that bucket next time. This
 * enables the use of a simple linked list to chain the TaskHolders off the
 * wheel.
 *
 * Also noteworthy is that this scheduler does not obtain a current time stamp
 * when scheduling single-shot tasks, instead it always rounds up the task
 * delay to a full multiple of the TickDuration. This means that tasks are
 * scheduled possibly one tick later than they could be (if checking that
 * “now() + delay <= nextTick” were done).
 */
class LightArrayRevolverScheduler(config: Config,
                                  log: LoggingAdapter,
                                  threadFactory: ThreadFactory)
  extends Scheduler with Closeable {

  import Helpers.Requiring
  import Helpers.ConfigOps

  val WheelSize =
    config.getInt("akka.scheduler.ticks-per-wheel")
      .requiring(ticks ⇒ (ticks & (ticks - 1)) == 0, "ticks-per-wheel must be a power of 2")
  val TickDuration =
    config.getMillisDuration("akka.scheduler.tick-duration")
      .requiring(_ >= 10.millis || !Helpers.isWindows, "minimum supported akka.scheduler.tick-duration on Windows is 10ms")
      .requiring(_ >= 1.millis, "minimum supported akka.scheduler.tick-duration is 1ms")
  val ShutdownTimeout = config.getMillisDuration("akka.scheduler.shutdown-timeout")

  import LightArrayRevolverScheduler._

  private val oneNs = Duration.fromNanos(1l)
  private def roundUp(d: FiniteDuration): FiniteDuration =
    try {
      ((d + TickDuration - oneNs) / TickDuration).toLong * TickDuration
    } catch {
      case _: IllegalArgumentException ⇒ d // rouding up Long.MaxValue.nanos overflows
    }

  /**
   * Clock implementation is replaceable (for testing); the implementation must
   * return a monotonically increasing series of Long nanoseconds.
   */
  protected def clock(): Long = System.nanoTime

  /**
   * Overridable for tests
   */
  protected def getShutdownTimeout: FiniteDuration = ShutdownTimeout

  /**
   * Overridable for tests
   */
  protected def waitNanos(nanos: Long): Unit = {
    // see http://www.javamex.com/tutorials/threads/sleep_issues.shtml
    val sleepMs = if (Helpers.isWindows) (nanos + 4999999) / 10000000 * 10 else (nanos + 999999) / 1000000
    try Thread.sleep(sleepMs) catch {
      case _: InterruptedException ⇒ Thread.currentThread.interrupt() // we got woken up
    }
  }

  override def schedule(initialDelay: FiniteDuration,
                        delay: FiniteDuration,
                        runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = {
    checkMaxDelay(roundUp(delay).toNanos)
    val preparedEC = executor.prepare()
    try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self ⇒
      compareAndSet(InitialRepeatMarker, schedule(
        preparedEC,
        new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
          override def run(): Unit = {
            try {
              runnable.run()
              val driftNanos = clock() - getAndAdd(delay.toNanos)
              if (self.get != null)
                swap(schedule(preparedEC, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
            } catch {
              case _: SchedulerException ⇒ // ignore failure to enqueue or terminated target actor
            }
          }
        }, roundUp(initialDelay)))

      @tailrec private def swap(c: Cancellable): Unit = {
        get match {
          case null ⇒ if (c != null) c.cancel()
          case old  ⇒ if (!compareAndSet(old, c)) swap(c)
        }
      }

      @tailrec final def cancel(): Boolean = {
        get match {
          case null ⇒ false
          case c ⇒
            if (c.cancel()) compareAndSet(c, null)
            else compareAndSet(c, null) || cancel()
        }
      }

      override def isCancelled: Boolean = get == null
    } catch {
      case SchedulerException(msg) ⇒ throw new IllegalStateException(msg)
    }
  }

  override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
    try schedule(executor.prepare(), runnable, roundUp(delay))
    catch {
      case SchedulerException(msg) ⇒ throw new IllegalStateException(msg)
    }

  override def close(): Unit = Await.result(stop(), getShutdownTimeout) foreach {
    task ⇒
      try task.run() catch {
        case e: InterruptedException ⇒ throw e
        case _: SchedulerException   ⇒ // ignore terminated actors
        case NonFatal(e)             ⇒ log.error(e, "exception while executing timer task")
      }
  }

  override val maxFrequency: Double = 1.second / TickDuration

  /*
   * BELOW IS THE ACTUAL TIMER IMPLEMENTATION
   */

  private val start = clock()
  private val tickNanos = TickDuration.toNanos
  private val wheelMask = WheelSize - 1
  private val queue = new TaskQueue

  private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask =
    if (delay <= Duration.Zero) {
      if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
      ec.execute(r)
      NotCancellable
    } else if (stopped.get != null) {
      throw new SchedulerException("cannot enqueue after timer shutdown")
    } else {
      val delayNanos = delay.toNanos
      checkMaxDelay(delayNanos)

      val ticks = (delayNanos / tickNanos).toInt
      val task = new TaskHolder(r, ticks, ec)
      queue.add(task)
      if (stopped.get != null && task.cancel())
        throw new SchedulerException("cannot enqueue after timer shutdown")
      task
    }

  private def checkMaxDelay(delayNanos: Long): Unit =
    if (delayNanos / tickNanos > Int.MaxValue)
      // 1 second margin in the error message due to rounding
      throw new IllegalArgumentException(s"Task scheduled with [${delayNanos.nanos.toSeconds}] seconds delay, " +
        s"which is too far in future, maximum delay is [${(tickNanos * Int.MaxValue).nanos.toSeconds - 1}] seconds")

  private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]]
  private def stop(): Future[immutable.Seq[TimerTask]] = {
    val p = Promise[immutable.Seq[TimerTask]]()
    if (stopped.compareAndSet(null, p)) {
      // Interrupting the timer thread to make it shut down faster is not good since
      // it could be in the middle of executing the scheduled tasks, which might not
      // respond well to being interrupted.
      // Instead we just wait one more tick for it to finish.
      p.future
    } else Future.successful(Nil)
  }

  @volatile private var timerThread: Thread = threadFactory.newThread(new Runnable {

    var tick = 0
    val wheel = Array.fill(WheelSize)(new TaskQueue)

    private def clearAll(): immutable.Seq[TimerTask] = {
      @tailrec def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = {
        q.poll() match {
          case null ⇒ acc
          case x    ⇒ collect(q, acc :+ x)
        }
      }
      ((0 until WheelSize) flatMap (i ⇒ collect(wheel(i), Vector.empty))) ++ collect(queue, Vector.empty)
    }

    @tailrec
    private def checkQueue(time: Long): Unit = queue.pollNode() match {
      case null ⇒ ()
      case node ⇒
        node.value.ticks match {
          case 0 ⇒ node.value.executeTask()
          case ticks ⇒
            val futureTick = ((
              time - start + // calculate the nanos since timer start
              (ticks * tickNanos) + // adding the desired delay
              tickNanos - 1 // rounding up
              ) / tickNanos).toInt // and converting to slot number
            // tick is an Int that will wrap around, but toInt of futureTick gives us modulo operations
            // and the difference (offset) will be correct in any case 
            val offset = futureTick - tick
            val bucket = futureTick & wheelMask
            node.value.ticks = offset
            wheel(bucket).addNode(node)
        }
        checkQueue(time)
    }

    override final def run =
      try nextTick()
      catch {
        case t: Throwable ⇒
          log.error(t, "exception on LARS’ timer thread")
          stopped.get match {
            case null ⇒
              val thread = threadFactory.newThread(this)
              log.info("starting new LARS thread")
              try thread.start()
              catch {
                case e: Throwable ⇒
                  log.error(e, "LARS cannot start new thread, ship’s going down!")
                  stopped.set(Promise successful Nil)
                  clearAll()
              }
              timerThread = thread
            case p ⇒
              assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS")
              p success clearAll()
          }
          throw t
      }

    @tailrec final def nextTick(): Unit = {
      val time = clock()
      val sleepTime = start + (tick * tickNanos) - time

      if (sleepTime > 0) {
        // check the queue before taking a nap
        checkQueue(time)
        waitNanos(sleepTime)
      } else {
        val bucket = tick & wheelMask
        val tasks = wheel(bucket)
        val putBack = new TaskQueue

        @tailrec def executeBucket(): Unit = tasks.pollNode() match {
          case null ⇒ ()
          case node ⇒
            val task = node.value
            if (!task.isCancelled) {
              if (task.ticks >= WheelSize) {
                task.ticks -= WheelSize
                putBack.addNode(node)
              } else task.executeTask()
            }
            executeBucket()
        }
        executeBucket()
        wheel(bucket) = putBack

        tick += 1
      }
      stopped.get match {
        case null ⇒ nextTick()
        case p ⇒
          assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS")
          p success clearAll()
      }
    }
  })

  timerThread.start()
}

object LightArrayRevolverScheduler {
  private[this] val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task"))

  private class TaskQueue extends AbstractNodeQueue[TaskHolder]

  /**
   * INTERNAL API
   */
  protected[actor] trait TimerTask extends Runnable with Cancellable

  /**
   * INTERNAL API
   */
  protected[actor] class TaskHolder(@volatile var task: Runnable, var ticks: Int, executionContext: ExecutionContext)
    extends TimerTask {

    @tailrec
    private final def extractTask(replaceWith: Runnable): Runnable =
      task match {
        case t @ (ExecutedTask | CancelledTask) ⇒ t
        case x                                  ⇒ if (unsafe.compareAndSwapObject(this, taskOffset, x, replaceWith)) x else extractTask(replaceWith)
      }

    private[akka] final def executeTask(): Boolean = extractTask(ExecutedTask) match {
      case ExecutedTask | CancelledTask ⇒ false
      case other ⇒
        try {
          executionContext execute other
          true
        } catch {
          case _: InterruptedException ⇒ { Thread.currentThread.interrupt(); false }
          case NonFatal(e)             ⇒ { executionContext.reportFailure(e); false }
        }
    }

    // This should only be called in execDirectly
    override def run(): Unit = extractTask(ExecutedTask).run()

    override def cancel(): Boolean = extractTask(CancelledTask) match {
      case ExecutedTask | CancelledTask ⇒ false
      case _                            ⇒ true
    }

    override def isCancelled: Boolean = task eq CancelledTask
  }

  private[this] val CancelledTask = new Runnable { def run = () }
  private[this] val ExecutedTask = new Runnable { def run = () }

  private val NotCancellable: TimerTask = new TimerTask {
    def cancel(): Boolean = false
    def isCancelled: Boolean = false
    def run(): Unit = ()
  }

  private val InitialRepeatMarker: Cancellable = new Cancellable {
    def cancel(): Boolean = false
    def isCancelled: Boolean = false
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka Scheduler.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.