|
Akka/Scala example source code file (LeveldbRecovery.scala)
The LeveldbRecovery.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal.leveldb
import scala.concurrent.Future
import akka.persistence._
import akka.persistence.journal.AsyncRecovery
import org.iq80.leveldb.DBIterator
/**
* INTERNAL API.
*
* LevelDB backed message replay and sequence number recovery.
*/
private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: LeveldbStore ⇒
import Key._
private lazy val replayDispatcherId = config.getString("replay-dispatcher")
private lazy val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId)
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
val nid = numericId(persistenceId)
Future(readHighestSequenceNr(nid))(replayDispatcher)
}
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = {
val nid = numericId(persistenceId)
Future(replayMessages(nid, fromSequenceNr: Long, toSequenceNr, max: Long)(replayCallback))(replayDispatcher)
}
def replayMessages(persistenceId: Int, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Unit = {
@scala.annotation.tailrec
def go(iter: DBIterator, key: Key, ctr: Long, replayCallback: PersistentRepr ⇒ Unit) {
if (iter.hasNext) {
val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey)
if (nextKey.sequenceNr > toSequenceNr) {
// end iteration here
} else if (nextKey.channelId != 0) {
// phantom confirmation (just advance iterator)
go(iter, nextKey, ctr, replayCallback)
} else if (key.persistenceId == nextKey.persistenceId) {
val msg = persistentFromBytes(nextEntry.getValue)
val del = deletion(iter, nextKey)
val cnf = confirms(iter, nextKey, Nil)
if (ctr < max) {
replayCallback(msg.update(confirms = cnf, deleted = del))
go(iter, nextKey, ctr + 1L, replayCallback)
}
}
}
}
@scala.annotation.tailrec
def confirms(iter: DBIterator, key: Key, channelIds: List[String]): List[String] = {
if (iter.hasNext) {
val nextEntry = iter.peekNext()
val nextKey = keyFromBytes(nextEntry.getKey)
if (key.persistenceId == nextKey.persistenceId && key.sequenceNr == nextKey.sequenceNr) {
val nextValue = new String(nextEntry.getValue, "UTF-8")
iter.next()
confirms(iter, nextKey, nextValue :: channelIds)
} else channelIds
} else channelIds
}
def deletion(iter: DBIterator, key: Key): Boolean = {
if (iter.hasNext) {
val nextEntry = iter.peekNext()
val nextKey = keyFromBytes(nextEntry.getKey)
if (key.persistenceId == nextKey.persistenceId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) {
iter.next()
true
} else false
} else false
}
withIterator { iter ⇒
val startKey = Key(persistenceId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0)
iter.seek(keyToBytes(startKey))
go(iter, startKey, 0L, replayCallback)
}
}
def readHighestSequenceNr(persistenceId: Int) = {
val ro = leveldbSnapshot()
try {
leveldb.get(keyToBytes(counterKey(persistenceId)), ro) match {
case null ⇒ 0L
case bytes ⇒ counterFromBytes(bytes)
}
} finally {
ro.snapshot().close()
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka LeveldbRecovery.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.