alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (BalancingDispatcher.scala)

This example Akka source code file (BalancingDispatcher.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorcell, actorsystemimpl, akka, balancingdispatcher, boolean, concurrent, dispatch, duration, envelope, loadmetrics, mailbox, mailboxtype, messagequeue, time, unit

The BalancingDispatcher.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.dispatch

import akka.actor.{ ActorCell, ActorRef }
import akka.dispatch.sysmsg._
import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import akka.util.Helpers
import java.util.{ Comparator, Iterator }
import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet }
import akka.actor.ActorSystemImpl
import scala.concurrent.duration.FiniteDuration

/**
 * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
 * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
 * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
 * <p/>
 * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
 * best described as "work donating" because the actor of which work is being stolen takes the initiative.
 * <p/>
 * The preferred way of creating dispatchers is to define configuration of it and use the
 * the `lookup` method in [[akka.dispatch.Dispatchers]].
 *
 * @see akka.dispatch.BalancingDispatcher
 * @see akka.dispatch.Dispatchers
 */
@deprecated("Use BalancingPool instead of BalancingDispatcher", "2.3")
class BalancingDispatcher(
  _configurator: MessageDispatcherConfigurator,
  _id: String,
  throughput: Int,
  throughputDeadlineTime: Duration,
  _mailboxType: MailboxType,
  _executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
  _shutdownTimeout: FiniteDuration,
  attemptTeamWork: Boolean)
  extends Dispatcher(_configurator, _id, throughput, throughputDeadlineTime, _executorServiceFactoryProvider, _shutdownTimeout) {

  /**
   * INTERNAL API
   */
  private[akka] val team = new ConcurrentSkipListSet[ActorCell](
    Helpers.identityHashComparator(new Comparator[ActorCell] {
      def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path
    }))

  /**
   * INTERNAL API
   */
  private[akka] val messageQueue: MessageQueue = _mailboxType.create(None, None)

  private class SharingMailbox(val system: ActorSystemImpl, _messageQueue: MessageQueue)
    extends Mailbox(_messageQueue) with DefaultSystemMessageQueue {
    override def cleanUp(): Unit = {
      val dlq = mailboxes.deadLetterMailbox
      //Don't call the original implementation of this since it scraps all messages, and we don't want to do that
      var messages = systemDrain(new LatestFirstSystemMessageList(NoMessage))
      while (messages.nonEmpty) {
        // message must be “virgin” before being able to systemEnqueue again
        val message = messages.head
        messages = messages.tail
        message.unlink()
        dlq.systemEnqueue(system.deadLetters, message)
      }
    }
  }

  protected[akka] override def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox =
    new SharingMailbox(actor.systemImpl, messageQueue)

  protected[akka] override def register(actor: ActorCell): Unit = {
    super.register(actor)
    team.add(actor)
  }

  protected[akka] override def unregister(actor: ActorCell): Unit = {
    team.remove(actor)
    super.unregister(actor)
    teamWork()
  }

  override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
    messageQueue.enqueue(receiver.self, invocation)
    if (!registerForExecution(receiver.mailbox, false, false)) teamWork()
  }

  protected def teamWork(): Unit =
    if (attemptTeamWork) {
      @tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit =
        if (messageQueue.hasMessages
          && i.hasNext
          && (executorService.executor match {
            case lm: LoadMetrics ⇒ lm.atFullThrottle == false
            case other           ⇒ true
          })
          && !registerForExecution(i.next.mailbox, false, false))
          scheduleOne(i)

      scheduleOne()
    }
}

Other Akka source code examples

Here is a short list of links related to this Akka BalancingDispatcher.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.