alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (InmemJournal.scala)

This example Akka source code file (InmemJournal.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, collection, concurrent, deletemessages, deletemessagesto, inmemmessages, journal, long, none, persistence, persistentrepr, some, string, time, vector, writemessages

The InmemJournal.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.persistence.journal.inmem

import scala.collection.immutable
import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor._
import akka.persistence._
import akka.persistence.journal.AsyncWriteProxy
import akka.persistence.journal.AsyncWriteTarget
import akka.util.Timeout

/**
 * INTERNAL API.
 *
 * In-memory journal for testing purposes only.
 */
private[persistence] class InmemJournal extends AsyncWriteProxy {
  import AsyncWriteProxy.SetStore

  val timeout = Timeout(5 seconds)

  override def preStart(): Unit = {
    super.preStart()
    self ! SetStore(context.actorOf(Props[InmemStore]))
  }
}

/**
 * INTERNAL API.
 */
private[persistence] trait InmemMessages {
  // persistenceId -> persistent message
  var messages = Map.empty[String, Vector[PersistentRepr]]

  def add(p: PersistentRepr) = messages = messages + (messages.get(p.persistenceId) match {
    case Some(ms) ⇒ p.persistenceId -> (ms :+ p)
    case None     ⇒ p.persistenceId -> Vector(p)
  })

  def update(pid: String, snr: Long)(f: PersistentRepr ⇒ PersistentRepr) = messages = messages.get(pid) match {
    case Some(ms) ⇒ messages + (pid -> ms.map(sp ⇒ if (sp.sequenceNr == snr) f(sp) else sp))
    case None     ⇒ messages
  }

  def delete(pid: String, snr: Long) = messages = messages.get(pid) match {
    case Some(ms) ⇒ messages + (pid -> ms.filterNot(_.sequenceNr == snr))
    case None     ⇒ messages
  }

  def read(pid: String, fromSnr: Long, toSnr: Long, max: Long): immutable.Seq[PersistentRepr] = messages.get(pid) match {
    case Some(ms) ⇒ ms.filter(m ⇒ m.sequenceNr >= fromSnr && m.sequenceNr <= toSnr).take(safeLongToInt(max))
    case None     ⇒ Nil
  }

  def highestSequenceNr(pid: String): Long = {
    val snro = for {
      ms ← messages.get(pid)
      m ← ms.lastOption
    } yield m.sequenceNr
    snro.getOrElse(0L)
  }

  private def safeLongToInt(l: Long): Int =
    if (Int.MaxValue < l) Int.MaxValue else l.toInt
}

/**
 * INTERNAL API.
 */
private[persistence] class InmemStore extends Actor with InmemMessages {
  import AsyncWriteTarget._

  def receive = {
    case WriteMessages(msgs) ⇒
      sender() ! msgs.foreach(add)
    case WriteConfirmations(cnfs) ⇒
      sender() ! cnfs.foreach(cnf ⇒ update(cnf.persistenceId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms)))
    case DeleteMessages(msgIds, false) ⇒
      sender() ! msgIds.foreach(msgId ⇒ update(msgId.persistenceId, msgId.sequenceNr)(_.update(deleted = true)))
    case DeleteMessages(msgIds, true) ⇒
      sender() ! msgIds.foreach(msgId ⇒ delete(msgId.persistenceId, msgId.sequenceNr))
    case DeleteMessagesTo(pid, tsnr, false) ⇒
      sender() ! (1L to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) })
    case DeleteMessagesTo(pid, tsnr, true) ⇒
      sender() ! (1L to tsnr foreach { snr ⇒ delete(pid, snr) })
    case ReplayMessages(pid, fromSnr, toSnr, max) ⇒
      read(pid, fromSnr, toSnr, max).foreach(sender() ! _)
      sender() ! ReplaySuccess
    case ReadHighestSequenceNr(persistenceId, _) ⇒
      sender() ! highestSequenceNr(persistenceId)
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka InmemJournal.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.