|
Akka/Scala example source code file (ChaosJournal.scala)
The ChaosJournal.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.chaos
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.persistence._
import akka.persistence.journal.SyncWriteJournal
import akka.persistence.journal.inmem.InmemMessages
class WriteFailedException(ps: Seq[PersistentRepr])
extends TestException(s"write failed for payloads = [${ps.map(_.payload)}]")
class ConfirmFailedException(cs: Seq[PersistentConfirmation])
extends TestException(s"write failed for confirmations = [${cs.map(c ⇒ s"${c.persistenceId}-${c.sequenceNr}-${c.channelId}")}]")
class ReplayFailedException(ps: Seq[PersistentRepr])
extends TestException(s"recovery failed after replaying payloads = [${ps.map(_.payload)}]")
class ReadHighestFailedException
extends TestException(s"recovery failed when reading highest sequence number")
class DeleteFailedException(messageIds: immutable.Seq[PersistentId])
extends TestException(s"delete failed for message ids = [${messageIds}]")
/**
* Keep [[ChaosJournal]] state in an external singleton so that it survives journal restarts.
* The journal itself uses a dedicated dispatcher, so there won't be any visibility issues.
*/
private object ChaosJournalMessages extends InmemMessages
class ChaosJournal extends SyncWriteJournal {
import ChaosJournalMessages.{ delete ⇒ del, _ }
val config = context.system.settings.config.getConfig("akka.persistence.journal.chaos")
val writeFailureRate = config.getDouble("write-failure-rate")
val confirmFailureRate = config.getDouble("confirm-failure-rate")
val deleteFailureRate = config.getDouble("delete-failure-rate")
val replayFailureRate = config.getDouble("replay-failure-rate")
val readHighestFailureRate = config.getDouble("read-highest-failure-rate")
def random = ThreadLocalRandom.current
def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit =
if (shouldFail(writeFailureRate)) throw new WriteFailedException(messages)
else messages.foreach(add)
def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Unit =
if (shouldFail(confirmFailureRate)) throw new ConfirmFailedException(confirmations)
else confirmations.foreach(cnf ⇒ update(cnf.persistenceId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms)))
def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit =
if (shouldFail(deleteFailureRate)) throw new DeleteFailedException(messageIds)
else if (permanent) messageIds.foreach(mid ⇒ update(mid.persistenceId, mid.sequenceNr)(_.update(deleted = true)))
else messageIds.foreach(mid ⇒ del(mid.persistenceId, mid.sequenceNr))
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit =
(1L to toSequenceNr).map(PersistentIdImpl(persistenceId, _))
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] =
if (shouldFail(replayFailureRate)) {
val rm = read(persistenceId, fromSequenceNr, toSequenceNr, max)
val sm = rm.take(random.nextInt(rm.length + 1))
sm.foreach(replayCallback)
Future.failed(new ReplayFailedException(sm))
} else {
read(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(replayCallback)
Future.successful(())
}
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
if (shouldFail(readHighestFailureRate)) Future.failed(new ReadHighestFailedException)
else Future.successful(highestSequenceNr(persistenceId))
def shouldFail(rate: Double): Boolean =
random.nextDouble() < rate
}
Other Akka source code examplesHere is a short list of links related to this Akka ChaosJournal.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.