alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Agent.scala)

This example Akka source code file (Agent.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

agent, akka, b, concurrent, executioncontext, future, runnable, secretagent, some, t, u, unit, util

The Agent.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.agent

import scala.concurrent.stm._
import scala.concurrent.{ ExecutionContext, Future, Promise }
import akka.util.{ SerializedSuspendableExecutionContext }

object Agent {
  /**
   * Factory method for creating an Agent.
   */
  def apply[T](initialValue: T)(implicit context: ExecutionContext): Agent[T] = new SecretAgent(initialValue, context)

  /**
   * Java API: Factory method for creating an Agent.
   */
  def create[T](initialValue: T, context: ExecutionContext): Agent[T] = Agent(initialValue)(context)

  /**
   * Default agent implementation.
   */
  private final class SecretAgent[T](initialValue: T, context: ExecutionContext) extends Agent[T] {
    private val ref = Ref(initialValue)
    private val updater = SerializedSuspendableExecutionContext(10)(context)

    def get(): T = ref.single.get

    def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) })

    def send(f: T ⇒ T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) })

    def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit = withinTransaction(
      new Runnable {
        def run =
          try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() })
      })

    def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue })

    def alter(f: T ⇒ T): Future[T] = doAlter(ref.single.transformAndGet(f))

    def alterOff(f: T ⇒ T)(implicit ec: ExecutionContext): Future[T] = {
      val result = Promise[T]()
      withinTransaction(new Runnable {
        def run = {
          updater.suspend()
          result completeWith Future(try ref.single.transformAndGet(f) finally updater.resume())
        }
      })
      result.future
    }

    /**
     * Internal helper method
     */
    private final def withinTransaction(run: Runnable): Unit = {
      Txn.findCurrent match {
        case Some(txn) ⇒ Txn.afterCommit(_ ⇒ updater.execute(run))(txn)
        case _         ⇒ updater.execute(run)
      }
    }

    /**
     * Internal helper method
     */
    private final def doAlter(f: ⇒ T): Future[T] = {
      Txn.findCurrent match {
        case Some(txn) ⇒
          val result = Promise[T]()
          Txn.afterCommit(status ⇒ result completeWith Future(f)(updater))(txn)
          result.future
        case _ ⇒ Future(f)(updater)
      }
    }

    def future(): Future[T] = Future(ref.single.get)(updater)

    def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(updater)

    def flatMap[B](f: T ⇒ Agent[B]): Agent[B] = f(get)

    def foreach[U](f: T ⇒ U): Unit = f(get)
  }
}

/**
 * The Agent class was inspired by agents in Clojure.
 *
 * Agents provide asynchronous change of individual locations. Agents
 * are bound to a single storage location for their lifetime, and only
 * allow mutation of that location (to a new state) to occur as a result
 * of an action. Update actions are functions that are asynchronously
 * applied to the Agent's state and whose return value becomes the
 * Agent's new state. The state of an Agent should be immutable.
 *
 * While updates to Agents are asynchronous, the state of an Agent is
 * always immediately available for reading by any thread (using ''get''
 * or ''apply'') without any messages.
 *
 * Agents are reactive. The update actions of all Agents get interleaved
 * amongst threads in a thread pool. At any point in time, at most one
 * ''send'' action for each Agent is being executed. Actions dispatched to
 * an agent from another thread will occur in the order they were sent,
 * potentially interleaved with actions dispatched to the same agent from
 * other sources.
 *
 * Example of usage:
 * {{{
 * val agent = Agent(5)
 *
 * agent send (_ * 2)
 *
 * ...
 *
 * val result = agent()
 * // use result ...
 *
 * }}}
 * <br/>
 *
 * Agent is also monadic, which means that you can compose operations using
 * for-comprehensions. In monadic usage the original agents are not touched
 * but new agents are created. So the old values (agents) are still available
 * as-is. They are so-called 'persistent'.
 * <br/><br/>
 *
 * Example of monadic usage:
 * {{{
 * val agent1 = Agent(3)
 * val agent2 = Agent(5)
 *
 * for (value <- agent1) {
 *   result = value + 1
 * }
 *
 * val agent3 = for (value <- agent1) yield value + 1
 *
 * val agent4 = for {
 *   value1 <- agent1
 *   value2 <- agent2
 * } yield value1 + value2
 *
 * }}}
 *
 * ==DEPRECATED STM SUPPORT==
 *
 * Agents participating in enclosing STM transaction is a deprecated feature in 2.3.
 *
 * If an Agent is used within an enclosing transaction, then it will
 * participate in that transaction. Agents are integrated with the STM -
 * any dispatches made in a transaction are held until that transaction
 * commits, and are discarded if it is retried or aborted.
 */
abstract class Agent[T] {

  /**
   * Java API: Read the internal state of the agent.
   */
  def get(): T

  /**
   * Read the internal state of the agent.
   */
  def apply(): T = get

  /**
   * Dispatch a new value for the internal state. Behaves the same
   * as sending a function (x => newValue).
   */
  def send(newValue: T): Unit

  /**
   * Dispatch a function to update the internal state.
   * In Java, pass in an instance of `akka.dispatch.Mapper`.
   */
  def send(f: T ⇒ T): Unit

  /**
   * Dispatch a function to update the internal state but on its own thread.
   * This does not use the reactive thread pool and can be used for long-running
   * or blocking operations. Dispatches using either `sendOff` or `send` will
   * still be executed in order.
   * In Java, pass in an instance of `akka.dispatch.Mapper`.
   */
  def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit

  /**
   * Dispatch an update to the internal state, and return a Future where
   * that new state can be obtained.
   * In Java, pass in an instance of `akka.dispatch.Mapper`.
   */
  def alter(newValue: T): Future[T]

  /**
   * Dispatch a function to update the internal state, and return a Future where
   * that new state can be obtained.
   * In Java, pass in an instance of `akka.dispatch.Mapper`.
   */
  def alter(f: T ⇒ T): Future[T]

  /**
   * Dispatch a function to update the internal state but on its own thread,
   * and return a Future where that new state can be obtained.
   * This does not use the reactive thread pool and can be used for long-running
   * or blocking operations. Dispatches using either `alterOff` or `alter` will
   * still be executed in order.
   * In Java, pass in an instance of `akka.dispatch.Mapper`.
   */
  def alterOff(f: T ⇒ T)(implicit ec: ExecutionContext): Future[T]

  /**
   * A future to the current value that will be completed after any currently
   * queued updates.
   */
  def future(): Future[T]

  /**
   * Map this agent to a new agent, applying the function to the internal state.
   * Does not change the value of this agent.
   * In Java, pass in an instance of `akka.dispatch.Mapper`.
   */
  def map[B](f: T ⇒ B): Agent[B]

  /**
   * Flatmap this agent to a new agent, applying the function to the internal state.
   * Does not change the value of this agent.
   * In Java, pass in an instance of `akka.dispatch.Mapper`.
   */
  def flatMap[B](f: T ⇒ Agent[B]): Agent[B]

  /**
   * Applies the function to the internal state. Does not change the value of this agent.
   * In Java, pass in an instance of `akka.dispatch.Foreach`.
   */
  def foreach[U](f: T ⇒ U): Unit
}

Other Akka source code examples

Here is a short list of links related to this Akka Agent.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.