|
Akka/Scala example source code file (PersistentPublisher.scala)
The PersistentPublisher.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence.stream import scala.util.control.NonFatal import scala.concurrent.duration._ import org.reactivestreams.api.Producer import org.reactivestreams.spi.Subscriber import akka.actor._ import akka.persistence._ import akka.stream._ import akka.stream.impl._ import akka.stream.impl.Ast.ProducerNode import akka.stream.scaladsl.Flow // ------------------------------------------------------------------------------------------------ // FIXME: move this file to akka-persistence-experimental once going back to project dependencies // NOTE: "producer" has been changed to "publisher" wherever possible, covering the upcoming // changes in reactive-streams. // ------------------------------------------------------------------------------------------------ object PersistentFlow { /** * Starts a new [[Persistent]] message flow from the given processor, * identified by `processorId`. Elements are pulled from the processor's * journal (using a [[View]]) in accordance with the demand coming from * the downstream transformation steps. * * Elements pulled from the processor's journal are buffered in memory so that * fine-grained demands (requests) from downstream can be served efficiently. */ def fromProcessor(processorId: String): Flow[Persistent] = fromProcessor(processorId, PersistentPublisherSettings()) /** * Starts a new [[Persistent]] message flow from the given processor, * identified by `processorId`. Elements are pulled from the processor's * journal (using a [[View]]) in accordance with the demand coming from * the downstream transformation steps. * * Elements pulled from the processor's journal are buffered in memory so that * fine-grained demands (requests) from downstream can be served efficiently. * Reads from the journal are done in (coarse-grained) batches of configurable * size (which correspond to the configurable maximum buffer size). * * @see [[PersistentPublisherSettings]] */ def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] = FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil) } /** * Configuration object for a [[Persistent]] stream publisher. * * @param fromSequenceNr Sequence number where the published stream shall start (inclusive). * Default is `1L`. * @param maxBufferSize Maximum number of persistent messages to be buffered in memory (per publisher). * Default is `100`. * @param idle Optional duration to wait if no more persistent messages can be pulled from the journal * before attempting the next pull. Default is `None` which causes the publisher to take * the value defined by the `akka.persistence.view.auto-update-interval` configuration * key. If defined, the `idle` value is taken directly. */ case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize: Int = 100, idle: Option[FiniteDuration] = None) { require(fromSequenceNr > 0L, "fromSequenceNr must be > 0") } private object PersistentPublisher { def props(processorId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props = Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings) } private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends ProducerNode[Persistent] { def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[Persistent] = new ActorProducer(context.actorOf(PersistentPublisher.props(processorId, publisherSettings, settings))) } private class PersistentPublisherImpl(processorId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings) extends Actor with ActorLogging with SubscriberManagement[Persistent] with SoftShutdown { import ActorBasedFlowMaterializer._ import PersistentPublisherBuffer._ type S = ActorSubscription[Persistent] private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self), "publisherBuffer") private var pub: ActorPublisher[Persistent] = _ private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason final def receive = { case ExposedPublisher(pub) ⇒ this.pub = pub.asInstanceOf[ActorPublisher[Persistent]] context.become(waitingForSubscribers) } final def waitingForSubscribers: Receive = { case SubscribePending ⇒ pub.takePendingSubscribers() foreach registerSubscriber context.become(active) } final def active: Receive = { case SubscribePending ⇒ pub.takePendingSubscribers() foreach registerSubscriber case RequestMore(sub, elements) ⇒ moreRequested(sub.asInstanceOf[S], elements) case Cancel(sub) ⇒ unregisterSubscription(sub.asInstanceOf[S]) case Response(ps) ⇒ try { ps.foreach(pushToDownstream) } catch { case Stop ⇒ { completeDownstream(); shutdownReason = None } case NonFatal(e) ⇒ { abortDownstream(e); shutdownReason = Some(e) } } } override def requestFromUpstream(elements: Int): Unit = buffer ! Request(elements) override def initialBufferSize = materializerSettings.initialFanOutBufferSize override def maxBufferSize = materializerSettings.maxFanOutBufferSize override def createSubscription(subscriber: Subscriber[Persistent]): ActorSubscription[Persistent] = new ActorSubscription(self, subscriber) override def cancelUpstream(): Unit = { pub.shutdown(shutdownReason) context.stop(buffer) softShutdown() } override def shutdown(completed: Boolean): Unit = { pub.shutdown(shutdownReason) context.stop(buffer) softShutdown() } override def postStop(): Unit = { pub.shutdown(shutdownReason) } } private object PersistentPublisherBuffer { case class Request(num: Int) case class Response(messages: Vector[Persistent]) case object Fill case object Filled } /** * A view that buffers up to `publisherSettings.maxBufferSize` persistent messages in memory. * Downstream demands (requests) are served if the buffer is non-empty either while filling * the buffer or after having filled the buffer. When the buffer becomes empty new persistent * messages are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`). */ private class PersistentPublisherBuffer(override val processorId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends View { import PersistentPublisherBuffer._ import context.dispatcher private var replayed = 0 private var requested = 0 private var buffer: Vector[Persistent] = Vector.empty private val filling: Receive = { case p: Persistent ⇒ buffer :+= p replayed += 1 if (requested > 0) respond(requested) case Filled ⇒ if (buffer.nonEmpty && requested > 0) respond(requested) if (buffer.nonEmpty) pause() else if (replayed > 0) fill() else schedule() case Request(num) ⇒ requested += num if (buffer.nonEmpty) respond(requested) } private val pausing: Receive = { case Request(num) ⇒ requested += num respond(requested) if (buffer.isEmpty) fill() } private val scheduled: Receive = { case Fill ⇒ fill() case Request(num) ⇒ requested += num } def receive = filling override def onReplaySuccess(receive: Receive, await: Boolean): Unit = { super.onReplaySuccess(receive, await) self ! Filled } override def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = { super.onReplayFailure(receive, await, cause) self ! Filled } override def lastSequenceNr: Long = math.max(publisherSettings.fromSequenceNr - 1L, super.lastSequenceNr) override def autoUpdateInterval: FiniteDuration = publisherSettings.idle.getOrElse(super.autoUpdateInterval) override def autoUpdateReplayMax: Long = publisherSettings.maxBufferSize override def autoUpdate: Boolean = false private def fill(): Unit = { replayed = 0 context.become(filling) self ! Update(await = false, autoUpdateReplayMax) } private def pause(): Unit = { context.become(pausing) } private def schedule(): Unit = { context.become(scheduled) context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill) } private def respond(num: Int): Unit = { val (res, buf) = buffer.splitAt(num) publisher ! Response(res) buffer = buf requested -= res.size } } Other Akka source code examplesHere is a short list of links related to this Akka PersistentPublisher.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.