alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (UnnestedReceives.scala)

This example Akka source code file (UnnestedReceives.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, collection, goahead, here, mutable, our, replay, switch, then, unit, unnestedreceives, when, while

The UnnestedReceives.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package docs.actor

import akka.actor._
import scala.collection.mutable.ListBuffer

/**
 * Requirements are as follows:
 * The first thing the actor needs to do, is to subscribe to a channel of events,
 * Then it must replay (process) all "old" events
 * Then it has to wait for a GoAhead signal to begin processing the new events
 * It mustn't "miss" events that happen between catching up with the old events and getting the GoAhead signal
 */
class UnnestedReceives extends Actor {
  import context.become
  //If you need to store sender/senderFuture you can change it to ListBuffer[(Any, Channel)]
  val queue = new ListBuffer[Any]()

  //This message processes a message/event
  def process(msg: Any): Unit = println("processing: " + msg)
  //This method subscribes the actor to the event bus
  def subscribe() {} //Your external stuff
  //This method retrieves all prior messages/events
  def allOldMessages() = List()

  override def preStart {
    //We override preStart to be sure that the first message the actor gets is
    //'Replay, that message will start to be processed _after_ the actor is started
    self ! 'Replay
    //Then we subscribe to the stream of messages/events
    subscribe()
  }

  def receive = {
    case 'Replay => //Our first message should be a 'Replay message, all others are invalid
      allOldMessages() foreach process //Process all old messages/events
      become { //Switch behavior to look for the GoAhead signal
        case 'GoAhead => //When we get the GoAhead signal we process all our buffered messages/events
          queue foreach process
          queue.clear
          become { //Then we change behaviour to process incoming messages/events as they arrive
            case msg => process(msg)
          }
        case msg => //While we haven't gotten the GoAhead signal, buffer all incoming messages
          queue += msg //Here you have full control, you can handle overflow etc
      }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka UnnestedReceives.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.