|
Akka/Scala example source code file (SharedLeveldbJournalSpec.scala)
The SharedLeveldbJournalSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence.journal.leveldb import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor._ import akka.persistence._ import akka.testkit.{ TestProbe, AkkaSpec } object SharedLeveldbJournalSpec { val config = """ akka { actor { provider = "akka.remote.RemoteActorRefProvider" } persistence { journal { plugin = "akka.persistence.journal.leveldb-shared" leveldb-shared.store.dir = target/journal-SharedLeveldbJournalSpec } snapshot-store.local.dir = target/snapshots-SharedLeveldbJournalSpec } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } } loglevel = ERROR log-dead-letters = 0 log-dead-letters-during-shutdown = off test.single-expect-default = 10s } """ class ExampleProcessor(probe: ActorRef, name: String) extends NamedProcessor(name) { def receive = { case Persistent(payload, _) ⇒ probe ! payload } } class ExampleApp(probe: ActorRef, storePath: ActorPath) extends Actor { val processor = context.actorOf(Props(classOf[ExampleProcessor], probe, context.system.name)) def receive = { case ActorIdentity(1, Some(store)) ⇒ SharedLeveldbJournal.setStore(store, context.system) case m ⇒ processor forward m } override def preStart(): Unit = { context.actorSelection(storePath) ! Identify(1) } } } class SharedLeveldbJournalSpec extends AkkaSpec(SharedLeveldbJournalSpec.config) with Cleanup { import SharedLeveldbJournalSpec._ val processorASystem = ActorSystem("processorA", system.settings.config) val processorBSystem = ActorSystem("processorB", system.settings.config) override protected def afterTermination() { shutdown(processorASystem) shutdown(processorBSystem) super.afterTermination() } "A LevelDB store" can { "be shared by multiple actor systems" in { val processorAProbe = new TestProbe(processorASystem) val processorBProbe = new TestProbe(processorBSystem) system.actorOf(Props[SharedLeveldbStore], "store") val storePath = RootActorPath(system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress) / "user" / "store" val appA = processorASystem.actorOf(Props(classOf[ExampleApp], processorAProbe.ref, storePath)) val appB = processorBSystem.actorOf(Props(classOf[ExampleApp], processorBProbe.ref, storePath)) appA ! Persistent("a1") appB ! Persistent("b1") processorAProbe.expectMsg("a1") processorBProbe.expectMsg("b1") val recoveredAppA = processorASystem.actorOf(Props(classOf[ExampleApp], processorAProbe.ref, storePath)) val recoveredAppB = processorBSystem.actorOf(Props(classOf[ExampleApp], processorBProbe.ref, storePath)) recoveredAppA ! Persistent("a2") recoveredAppB ! Persistent("b2") processorAProbe.expectMsg("a1") processorAProbe.expectMsg("a2") processorBProbe.expectMsg("b1") processorBProbe.expectMsg("b2") } } } Other Akka source code examplesHere is a short list of links related to this Akka SharedLeveldbJournalSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.