|
Akka/Scala example source code file (PerformanceSpec.scala)
The PerformanceSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.testkit._
object PerformanceSpec {
// multiply cycles with 200 for more
// accurate throughput measurements
val config =
"""
akka.persistence.performance.cycles.warmup = 300
akka.persistence.performance.cycles.load = 1000
"""
case object StartMeasure
case object StopMeasure
final case class FailAt(sequenceNr: Long)
trait Measure extends { this: Actor ⇒
val NanoToSecond = 1000.0 * 1000 * 1000
var startTime: Long = 0L
var stopTime: Long = 0L
var startSequenceNr = 0L
var stopSequenceNr = 0L
def startMeasure(): Unit = {
startSequenceNr = lastSequenceNr
startTime = System.nanoTime
}
def stopMeasure(): Unit = {
stopSequenceNr = lastSequenceNr
stopTime = System.nanoTime
sender() ! (NanoToSecond * (stopSequenceNr - startSequenceNr) / (stopTime - startTime))
}
def lastSequenceNr: Long
}
class PerformanceTestDestination extends Actor with Measure {
var lastSequenceNr = 0L
val confirm: PartialFunction[Any, Any] = {
case cp @ ConfirmablePersistent(payload, sequenceNr, _) ⇒
lastSequenceNr = sequenceNr
cp.confirm()
payload
}
def receive = confirm andThen {
case StartMeasure ⇒ startMeasure()
case StopMeasure ⇒ stopMeasure()
case m ⇒ if (lastSequenceNr % 1000 == 0) print(".")
}
}
abstract class PerformanceTestProcessor(name: String) extends NamedProcessor(name) with Measure {
var failAt: Long = -1
val controlBehavior: Receive = {
case StartMeasure ⇒ startMeasure()
case StopMeasure ⇒ stopMeasure()
case FailAt(sequenceNr) ⇒ failAt = sequenceNr
}
override def postRestart(reason: Throwable) {
super.postRestart(reason)
receive(StartMeasure)
}
}
class CommandsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) {
def receive = controlBehavior orElse {
case p: Persistent ⇒
if (lastSequenceNr % 1000 == 0) if (recoveryRunning) print("r") else print(".")
if (lastSequenceNr == failAt) throw new TestException("boom")
}
}
class CommandsourcedTestPersistentActor(name: String) extends PerformanceTestProcessor(name) with PersistentActor {
override val controlBehavior: Receive = {
case StartMeasure ⇒ startMeasure()
case StopMeasure ⇒ defer(StopMeasure)(_ ⇒ stopMeasure())
case FailAt(sequenceNr) ⇒ failAt = sequenceNr
}
val receiveRecover: Receive = {
case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r")
}
val receiveCommand: Receive = controlBehavior orElse {
case cmd ⇒ persistAsync(cmd) { _ ⇒
if (lastSequenceNr % 1000 == 0) print(".")
if (lastSequenceNr == failAt) throw new TestException("boom")
}
}
}
class EventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with PersistentActor {
val receiveRecover: Receive = {
case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r")
}
val receiveCommand: Receive = controlBehavior orElse {
case cmd ⇒ persist(cmd) { _ ⇒
if (lastSequenceNr % 1000 == 0) print(".")
if (lastSequenceNr == failAt) throw new TestException("boom")
}
}
}
class StashingEventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with PersistentActor {
val receiveRecover: Receive = {
case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r")
}
val printProgress: PartialFunction[Any, Any] = {
case m ⇒ if (lastSequenceNr % 1000 == 0) print("."); m
}
val receiveCommand: Receive = printProgress andThen (controlBehavior orElse {
case "a" ⇒ persist("a")(_ ⇒ context.become(processC))
case "b" ⇒ persist("b")(_ ⇒ ())
})
val processC: Receive = printProgress andThen {
case "c" ⇒
persist("c")(_ ⇒ context.unbecome())
unstashAll()
case other ⇒ stash()
}
}
}
class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "PerformanceSpec", serialization = "off").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender {
import PerformanceSpec._
val warmupCycles = system.settings.config.getInt("akka.persistence.performance.cycles.warmup")
val loadCycles = system.settings.config.getInt("akka.persistence.performance.cycles.load")
def stressCommandsourcedProcessor(failAt: Option[Long]): Unit = {
val processor = namedProcessor[CommandsourcedTestProcessor]
failAt foreach { processor ! FailAt(_) }
1 to warmupCycles foreach { i ⇒ processor ! Persistent(s"msg${i}") }
processor ! StartMeasure
1 to loadCycles foreach { i ⇒ processor ! Persistent(s"msg${i}") }
processor ! StopMeasure
expectMsgPF(100 seconds) {
case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent processor commands per second")
}
}
def stressCommandsourcedPersistentActor(failAt: Option[Long]): Unit = {
val processor = namedProcessor[CommandsourcedTestPersistentActor]
failAt foreach { processor ! FailAt(_) }
1 to warmupCycles foreach { i ⇒ processor ! s"msg${i}" }
processor ! StartMeasure
1 to loadCycles foreach { i ⇒ processor ! s"msg${i}" }
processor ! StopMeasure
expectMsgPF(100 seconds) {
case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent actor commands per second")
}
}
def stressPersistentActor(failAt: Option[Long]): Unit = {
val processor = namedProcessor[EventsourcedTestProcessor]
failAt foreach { processor ! FailAt(_) }
1 to warmupCycles foreach { i ⇒ processor ! s"msg${i}" }
processor ! StartMeasure
1 to loadCycles foreach { i ⇒ processor ! s"msg${i}" }
processor ! StopMeasure
expectMsgPF(100 seconds) {
case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent events per second")
}
}
def stressStashingPersistentActor(): Unit = {
val processor = namedProcessor[StashingEventsourcedTestProcessor]
1 to warmupCycles foreach { i ⇒ processor ! "b" }
processor ! StartMeasure
val cmds = 1 to (loadCycles / 3) flatMap (_ ⇒ List("a", "b", "c"))
processor ! StartMeasure
cmds foreach (processor ! _)
processor ! StopMeasure
expectMsgPF(100 seconds) {
case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent events per second")
}
}
def stressPersistentChannel(): Unit = {
val channel = system.actorOf(PersistentChannel.props())
val destination = system.actorOf(Props[PerformanceTestDestination])
1 to warmupCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", persistenceId = "test"), destination.path) }
channel ! Deliver(Persistent(StartMeasure), destination.path)
1 to loadCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", persistenceId = "test"), destination.path) }
channel ! Deliver(Persistent(StopMeasure), destination.path)
expectMsgPF(100 seconds) {
case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent messages per second")
}
}
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistentChannel])
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[DeliveredByPersistentChannel]
"A command sourced processor" should {
"have some reasonable throughput" in {
stressCommandsourcedProcessor(None)
}
"have some reasonable throughput under failure conditions" in {
stressCommandsourcedProcessor(Some(warmupCycles + loadCycles / 10))
}
}
"A command sourced persistent actor" should {
"have some reasonable throughput" in {
stressCommandsourcedPersistentActor(None)
}
}
"An event sourced persistent actor" should {
"have some reasonable throughput" in {
stressPersistentActor(None)
}
"have some reasonable throughput under failure conditions" in {
stressPersistentActor(Some(warmupCycles + loadCycles / 10))
}
"have some reasonable throughput with stashing and unstashing every 3rd command" in {
stressStashingPersistentActor()
}
}
"A persistent channel" should {
"have some reasonable throughput" in {
val probe = TestProbe()
subscribeToConfirmation(probe)
stressPersistentChannel()
probe.fishForMessage(100.seconds) {
case DeliveredByPersistentChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2
}
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka PerformanceSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.