alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (LocalSnapshotStore.scala)

This example Akka source code file (LocalSnapshotStore.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, b, collection, concurrent, file, future, inputstream, persistence, serialization, serialize, snapshot, snapshotmetadata, snapshotselectioncriteria, string, unit, utf-8

The LocalSnapshotStore.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 * Copyright (C) 2012-2013 Eligotech BV.
 */

package akka.persistence.snapshot.local

import java.io._
import java.net.{ URLDecoder, URLEncoder }

import scala.collection.immutable
import scala.concurrent.Future
import scala.util._

import akka.actor.ActorLogging
import akka.persistence._
import akka.persistence.snapshot._
import akka.persistence.serialization._
import akka.serialization.SerializationExtension

/**
 * INTERNAL API.
 *
 * Local filesystem backed snapshot store.
 */
private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLogging {
  private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r

  private val config = context.system.settings.config.getConfig("akka.persistence.snapshot-store.local")
  private val streamDispatcher = context.system.dispatchers.lookup(config.getString("stream-dispatcher"))
  private val snapshotDir = new File(config.getString("dir"))

  private val serializationExtension = SerializationExtension(context.system)
  private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress

  def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
    //
    // Heuristics:
    //
    // Select youngest 3 snapshots that match upper bound. This may help in situations
    // where saving of a snapshot could not be completed because of a JVM crash. Hence,
    // an attempt to load that snapshot will fail but loading an older snapshot may
    // succeed.
    //
    // TODO: make number of loading attempts configurable
    //
    val metadata = snapshotMetadata(persistenceId, criteria).sorted.takeRight(3)
    Future(load(metadata))(streamDispatcher)
  }

  def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
    saving += metadata
    Future(save(metadata, snapshot))(streamDispatcher)
  }

  def saved(metadata: SnapshotMetadata): Unit = {
    saving -= metadata
  }

  def delete(metadata: SnapshotMetadata): Unit = {
    saving -= metadata
    snapshotFile(metadata).delete()
  }

  def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = {
    snapshotMetadata(persistenceId, criteria).foreach(delete)
  }

  @scala.annotation.tailrec
  private def load(metadata: immutable.Seq[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match {
    case None ⇒ None
    case Some(md) ⇒
      Try(withInputStream(md)(deserialize)) match {
        case Success(s) ⇒ Some(SelectedSnapshot(md, s.data))
        case Failure(e) ⇒
          log.error(e, s"Error loading snapshot [${md}]")
          load(metadata.init) // try older snapshot
      }
  }

  protected def save(metadata: SnapshotMetadata, snapshot: Any): Unit = {
    val tmpFile = withOutputStream(metadata)(serialize(_, Snapshot(snapshot)))
    tmpFile.renameTo(snapshotFile(metadata))
  }

  protected def deserialize(inputStream: InputStream): Snapshot =
    serializationExtension.deserialize(streamToBytes(inputStream), classOf[Snapshot]).get

  protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit =
    outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot))

  protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit): File = {
    val tmpFile = snapshotFile(metadata, extension = "tmp")
    withStream(new BufferedOutputStream(new FileOutputStream(tmpFile)), p)
    tmpFile
  }

  private def withInputStream[T](metadata: SnapshotMetadata)(p: (InputStream) ⇒ T): T =
    withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p)

  private def withStream[A <: Closeable, B](stream: A, p: A ⇒ B): B =
    try { p(stream) } finally { stream.close() }

  private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File =
    new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}")

  private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] =
    snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId)).map(_.getName).collect {
      case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong)
    }.filter(md ⇒ criteria.matches(md) && !saving.contains(md)).toVector

  override def preStart() {
    if (!snapshotDir.isDirectory) {
      // try to create the directory, on failure double check if someone else beat us to it
      if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory) {
        throw new IOException(s"Failed to create snapshot directory [${snapshotDir.getCanonicalPath}]")
      }
    }
    super.preStart()
  }

  private class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter {
    def accept(dir: File, name: String): Boolean = name.startsWith(s"snapshot-${URLEncoder.encode(persistenceId, "UTF-8")}")
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka LocalSnapshotStore.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.