alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Scala example source code file (Actor.scala)

This example Scala source code file (Actor.scala) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Scala by Example" TM.

Learn more about this Scala project at its project page.

Java - Scala tags/keywords

actor, actorfunctions, actorinstances, actorutils, atomicreference, contravariant, int, node, run, strategy, throwable, tofunctionfromactor, unit

The Actor.scala Scala example source code

package scalaz
package concurrent

import java.util.concurrent.atomic.AtomicReference

/**
 * Processes messages of type `A`, one at a time. Messages are submitted to
 * the actor with the method `!`. Processing is typically performed asynchronously,
 * this is controlled by the provided `strategy`.
 *
 * Memory consistency guarantee: when each message is processed by the `handler`, any memory that it
 * mutates is guaranteed to be visible by the `handler` when it processes the next message, even if
 * the `strategy` runs the invocations of `handler` on separate threads. This is achieved because
 * the `Actor` reads a volatile memory location before entering its event loop, and writes to the same
 * location before suspending.
 *
 * Implementation based on non-intrusive MPSC node-based queue, described by Dmitriy Vyukov:
 * [[http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue]]
 *
 * @param handler  The message handler
 * @param onError  Exception handler, called if the message handler throws any `Throwable`.
 * @param strategy Execution strategy, for example, a strategy that is backed by an `ExecutorService`
 * @tparam A       The type of messages accepted by this actor.
 */
final case class Actor[A](handler: A => Unit, onError: Throwable => Unit = ActorUtils.rethrowError)
                         (implicit val strategy: Strategy) {
  private[this] val head = new AtomicReference[Node[A]]

  val toEffect: Run[A] = Run[A](a => this ! a)

  /** Alias for `apply` */
  def !(a: A): Unit = {
    val n = new Node(a)
    val h = head.getAndSet(n)
    if (h ne null) h.lazySet(n)
    else schedule(n)
  }

  /** Pass the message `a` to the mailbox of this actor */
  def apply(a: A): Unit = this ! a

  def contramap[B](f: B => A): Actor[B] = new Actor[B](b => this ! f(b), onError)(strategy)

  private def schedule(n: Node[A]): Unit = strategy(act(n))

  @annotation.tailrec
  private def act(n: Node[A], i: Int = 1024): Unit = {
    try handler(n.a) catch {
      case ex: Throwable => onError(ex)
    }
    val n2 = n.get
    if (n2 eq null) scheduleLastTry(n)
    else if (i == 0) schedule(n2)
    else act(n2, i - 1)
  }

  private def scheduleLastTry(n: Node[A]): Unit = strategy(lastTry(n))

  private def lastTry(n: Node[A]): Unit = if (!head.compareAndSet(n, null)) act(next(n))

  @annotation.tailrec
  private def next(n: Node[A]): Node[A] = {
    val n2 = n.get
    if (n2 ne null) n2
    else next(n)
  }
}

private class Node[A](val a: A) extends AtomicReference[Node[A]]

private object ActorUtils {
  val rethrowError: Throwable => Unit = throw _
}

object Actor extends ActorInstances with ActorFunctions

sealed abstract class ActorInstances {
  implicit val actorContravariant: Contravariant[Actor] = new Contravariant[Actor] {
    def contramap[A, B](r: Actor[A])(f: B => A): Actor[B] = r contramap f
  }
}

trait ActorFunctions {
  def actor[A](handler: A => Unit, onError: Throwable => Unit = ActorUtils.rethrowError)
              (implicit s: Strategy): Actor[A] = new Actor[A](handler, onError)(s)

  implicit def ToFunctionFromActor[A](a: Actor[A]): A => Unit = a ! _
}

Other Scala examples (source code examples)

Here is a short list of links related to this Scala Actor.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.