|
Akka/Scala example source code file (LocalSnapshotStore.scala)
The LocalSnapshotStore.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2012-2013 Eligotech BV. */ package akka.persistence.snapshot.local import java.io._ import java.net.{ URLDecoder, URLEncoder } import scala.collection.immutable import scala.concurrent.Future import scala.util._ import akka.actor.ActorLogging import akka.persistence._ import akka.persistence.snapshot._ import akka.persistence.serialization._ import akka.serialization.SerializationExtension /** * INTERNAL API. * * Local filesystem backed snapshot store. */ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLogging { private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r private val config = context.system.settings.config.getConfig("akka.persistence.snapshot-store.local") private val streamDispatcher = context.system.dispatchers.lookup(config.getString("stream-dispatcher")) private val snapshotDir = new File(config.getString("dir")) private val serializationExtension = SerializationExtension(context.system) private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { // // Heuristics: // // Select youngest 3 snapshots that match upper bound. This may help in situations // where saving of a snapshot could not be completed because of a JVM crash. Hence, // an attempt to load that snapshot will fail but loading an older snapshot may // succeed. // // TODO: make number of loading attempts configurable // val metadata = snapshotMetadata(persistenceId, criteria).sorted.takeRight(3) Future(load(metadata))(streamDispatcher) } def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { saving += metadata Future(save(metadata, snapshot))(streamDispatcher) } def saved(metadata: SnapshotMetadata): Unit = { saving -= metadata } def delete(metadata: SnapshotMetadata): Unit = { saving -= metadata snapshotFile(metadata).delete() } def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = { snapshotMetadata(persistenceId, criteria).foreach(delete) } @scala.annotation.tailrec private def load(metadata: immutable.Seq[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { case None ⇒ None case Some(md) ⇒ Try(withInputStream(md)(deserialize)) match { case Success(s) ⇒ Some(SelectedSnapshot(md, s.data)) case Failure(e) ⇒ log.error(e, s"Error loading snapshot [${md}]") load(metadata.init) // try older snapshot } } protected def save(metadata: SnapshotMetadata, snapshot: Any): Unit = { val tmpFile = withOutputStream(metadata)(serialize(_, Snapshot(snapshot))) tmpFile.renameTo(snapshotFile(metadata)) } protected def deserialize(inputStream: InputStream): Snapshot = serializationExtension.deserialize(streamToBytes(inputStream), classOf[Snapshot]).get protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit = outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot)) protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit): File = { val tmpFile = snapshotFile(metadata, extension = "tmp") withStream(new BufferedOutputStream(new FileOutputStream(tmpFile)), p) tmpFile } private def withInputStream[T](metadata: SnapshotMetadata)(p: (InputStream) ⇒ T): T = withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p) private def withStream[A <: Closeable, B](stream: A, p: A ⇒ B): B = try { p(stream) } finally { stream.close() } private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File = new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId)).map(_.getName).collect { case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) }.filter(md ⇒ criteria.matches(md) && !saving.contains(md)).toVector override def preStart() { if (!snapshotDir.isDirectory) { // try to create the directory, on failure double check if someone else beat us to it if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory) { throw new IOException(s"Failed to create snapshot directory [${snapshotDir.getCanonicalPath}]") } } super.preStart() } private class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter { def accept(dir: File, name: String): Boolean = name.startsWith(s"snapshot-${URLEncoder.encode(persistenceId, "UTF-8")}") } } Other Akka source code examplesHere is a short list of links related to this Akka LocalSnapshotStore.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.