|
Akka/Scala example source code file (LeveldbStore.scala)
The LeveldbStore.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2012-2013 Eligotech BV. */ package akka.persistence.journal.leveldb import java.io.File import scala.collection.immutable import scala.util._ import org.iq80.leveldb._ import akka.actor._ import akka.persistence._ import akka.persistence.journal.AsyncWriteTarget import akka.serialization.SerializationExtension /** * INTERNAL API. */ private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with LeveldbRecovery { val configPath: String val config = context.system.settings.config.getConfig(configPath) val nativeLeveldb = config.getBoolean("native") val leveldbOptions = new Options().createIfMissing(true) def leveldbReadOptions = new ReadOptions().verifyChecksums(config.getBoolean("checksum")) val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync")).snapshot(false) val leveldbDir = new File(config.getString("dir")) var leveldb: DB = _ def leveldbFactory = if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory else org.iq80.leveldb.impl.Iq80DBFactory.factory val serialization = SerializationExtension(context.system) import Key._ def writeMessages(messages: immutable.Seq[PersistentRepr]) = withBatch(batch ⇒ messages.foreach(message ⇒ addToMessageBatch(message, batch))) def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = withBatch(batch ⇒ confirmations.foreach(confirmation ⇒ addToConfirmationBatch(confirmation, batch))) def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = withBatch { batch ⇒ messageIds foreach { messageId ⇒ if (permanent) batch.delete(keyToBytes(Key(numericId(messageId.persistenceId), messageId.sequenceNr, 0))) else batch.put(keyToBytes(deletionKey(numericId(messageId.persistenceId), messageId.sequenceNr)), Array.emptyByteArray) } } def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ val nid = numericId(persistenceId) // seek to first existing message val fromSequenceNr = withIterator { iter ⇒ val startKey = Key(nid, 1L, 0) iter.seek(keyToBytes(startKey)) if (iter.hasNext) keyFromBytes(iter.peekNext().getKey).sequenceNr else Long.MaxValue } fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ if (permanent) batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete confirmations and deletion markers, if any. else batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.emptyByteArray) } } def leveldbSnapshot(): ReadOptions = leveldbReadOptions.snapshot(leveldb.getSnapshot) def withIterator[R](body: DBIterator ⇒ R): R = { val ro = leveldbSnapshot() val iterator = leveldb.iterator(ro) try { body(iterator) } finally { iterator.close() ro.snapshot().close() } } def withBatch[R](body: WriteBatch ⇒ R): R = { val batch = leveldb.createWriteBatch() try { val r = body(batch) leveldb.write(batch, leveldbWriteOptions) r } finally { batch.close() } } def persistentToBytes(p: PersistentRepr): Array[Byte] = serialization.serialize(p).get def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get private def addToMessageBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = { val nid = numericId(persistent.persistenceId) batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) } private def addToConfirmationBatch(confirmation: PersistentConfirmation, batch: WriteBatch): Unit = { val npid = numericId(confirmation.persistenceId) val ncid = numericId(confirmation.channelId) batch.put(keyToBytes(Key(npid, confirmation.sequenceNr, ncid)), confirmation.channelId.getBytes("UTF-8")) } override def preStart() { leveldb = leveldbFactory.open(leveldbDir, if (nativeLeveldb) leveldbOptions else leveldbOptions.compressionType(CompressionType.NONE)) super.preStart() } override def postStop() { leveldb.close() super.postStop() } } /** * A LevelDB store that can be shared by multiple actor systems. The shared store must be * set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The * shared LevelDB store is for testing only. */ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore { import AsyncWriteTarget._ def receive = { case WriteMessages(msgs) ⇒ sender() ! writeMessages(msgs) case WriteConfirmations(cnfs) ⇒ sender() ! writeConfirmations(cnfs) case DeleteMessages(messageIds, permanent) ⇒ sender() ! deleteMessages(messageIds, permanent) case DeleteMessagesTo(pid, tsnr, permanent) ⇒ sender() ! deleteMessagesTo(pid, tsnr, permanent) case ReadHighestSequenceNr(pid, fromSequenceNr) ⇒ sender() ! readHighestSequenceNr(numericId(pid)) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(sender() ! _)) match { case Success(max) ⇒ sender() ! ReplaySuccess case Failure(cause) ⇒ sender() ! ReplayFailure(cause) } } } Other Akka source code examplesHere is a short list of links related to this Akka LeveldbStore.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.