|
Akka/Scala example source code file (InmemJournal.scala)
The InmemJournal.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence.journal.inmem import scala.collection.immutable import scala.concurrent.duration._ import scala.language.postfixOps import akka.actor._ import akka.persistence._ import akka.persistence.journal.AsyncWriteProxy import akka.persistence.journal.AsyncWriteTarget import akka.util.Timeout /** * INTERNAL API. * * In-memory journal for testing purposes only. */ private[persistence] class InmemJournal extends AsyncWriteProxy { import AsyncWriteProxy.SetStore val timeout = Timeout(5 seconds) override def preStart(): Unit = { super.preStart() self ! SetStore(context.actorOf(Props[InmemStore])) } } /** * INTERNAL API. */ private[persistence] trait InmemMessages { // persistenceId -> persistent message var messages = Map.empty[String, Vector[PersistentRepr]] def add(p: PersistentRepr) = messages = messages + (messages.get(p.persistenceId) match { case Some(ms) ⇒ p.persistenceId -> (ms :+ p) case None ⇒ p.persistenceId -> Vector(p) }) def update(pid: String, snr: Long)(f: PersistentRepr ⇒ PersistentRepr) = messages = messages.get(pid) match { case Some(ms) ⇒ messages + (pid -> ms.map(sp ⇒ if (sp.sequenceNr == snr) f(sp) else sp)) case None ⇒ messages } def delete(pid: String, snr: Long) = messages = messages.get(pid) match { case Some(ms) ⇒ messages + (pid -> ms.filterNot(_.sequenceNr == snr)) case None ⇒ messages } def read(pid: String, fromSnr: Long, toSnr: Long, max: Long): immutable.Seq[PersistentRepr] = messages.get(pid) match { case Some(ms) ⇒ ms.filter(m ⇒ m.sequenceNr >= fromSnr && m.sequenceNr <= toSnr).take(safeLongToInt(max)) case None ⇒ Nil } def highestSequenceNr(pid: String): Long = { val snro = for { ms ← messages.get(pid) m ← ms.lastOption } yield m.sequenceNr snro.getOrElse(0L) } private def safeLongToInt(l: Long): Int = if (Int.MaxValue < l) Int.MaxValue else l.toInt } /** * INTERNAL API. */ private[persistence] class InmemStore extends Actor with InmemMessages { import AsyncWriteTarget._ def receive = { case WriteMessages(msgs) ⇒ sender() ! msgs.foreach(add) case WriteConfirmations(cnfs) ⇒ sender() ! cnfs.foreach(cnf ⇒ update(cnf.persistenceId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) case DeleteMessages(msgIds, false) ⇒ sender() ! msgIds.foreach(msgId ⇒ update(msgId.persistenceId, msgId.sequenceNr)(_.update(deleted = true))) case DeleteMessages(msgIds, true) ⇒ sender() ! msgIds.foreach(msgId ⇒ delete(msgId.persistenceId, msgId.sequenceNr)) case DeleteMessagesTo(pid, tsnr, false) ⇒ sender() ! (1L to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) }) case DeleteMessagesTo(pid, tsnr, true) ⇒ sender() ! (1L to tsnr foreach { snr ⇒ delete(pid, snr) }) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ read(pid, fromSnr, toSnr, max).foreach(sender() ! _) sender() ! ReplaySuccess case ReadHighestSequenceNr(persistenceId, _) ⇒ sender() ! highestSequenceNr(persistenceId) } } Other Akka source code examplesHere is a short list of links related to this Akka InmemJournal.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.