|
Akka/Scala example source code file (EchoServer.scala)
The EchoServer.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package docs.io import java.net.InetSocketAddress import scala.concurrent.duration.DurationInt import com.typesafe.config.ConfigFactory import akka.actor.{ Actor, ActorDSL, ActorLogging, ActorRef, ActorSystem, Props, SupervisorStrategy } import akka.actor.ActorDSL.inbox import akka.io.{ IO, Tcp } import akka.util.ByteString object EchoServer extends App { val config = ConfigFactory.parseString("akka.loglevel = DEBUG") implicit val system = ActorSystem("EchoServer", config) // make sure to stop the system so that the application stops try run() finally system.shutdown() def run(): Unit = { import ActorDSL._ // create two EchoManager and stop the application once one dies val watcher = inbox() watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[EchoHandler]), "echo")) watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler]), "simple")) watcher.receive(10.minutes) } } class EchoManager(handlerClass: Class[_]) extends Actor with ActorLogging { import Tcp._ import context.system // there is not recovery for broken connections override val supervisorStrategy = SupervisorStrategy.stoppingStrategy // bind to the listen port; the port will automatically be closed once this actor dies override def preStart(): Unit = { IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0)) } // do not restart override def postRestart(thr: Throwable): Unit = context stop self def receive = { case Bound(localAddress) => log.info("listening on port {}", localAddress.getPort) case CommandFailed(Bind(_, local, _, _, _)) => log.warning(s"cannot bind to [$local]") context stop self //#echo-manager case Connected(remote, local) => log.info("received connection from {}", remote) val handler = context.actorOf(Props(handlerClass, sender(), remote)) sender() ! Register(handler, keepOpenOnPeerClosed = true) //#echo-manager } } object EchoHandler { def props(connection: ActorRef, remote: InetSocketAddress): Props = Props(classOf[EchoHandler], connection, remote) } //#echo-handler class EchoHandler(connection: ActorRef, remote: InetSocketAddress) extends Actor with ActorLogging { import Tcp._ final case class Ack(offset: Int) extends Event // sign death pact: this actor terminates when connection breaks context watch connection // start out in optimistic write-through mode def receive = writing //#writing def writing: Receive = { case Received(data) => connection ! Write(data, Ack(currentOffset)) buffer(data) case Ack(ack) => acknowledge(ack) case CommandFailed(Write(_, Ack(ack))) => connection ! ResumeWriting context become buffering(ack) case PeerClosed => if (storage.isEmpty) context stop self else context become closing } //#writing //#buffering def buffering(nack: Int): Receive = { var toAck = 10 var peerClosed = false { case Received(data) => buffer(data) case WritingResumed => writeFirst() case PeerClosed => peerClosed = true case Ack(ack) if ack < nack => acknowledge(ack) case Ack(ack) => acknowledge(ack) if (storage.nonEmpty) { if (toAck > 0) { // stay in ACK-based mode for a while writeFirst() toAck -= 1 } else { // then return to NACK-based again writeAll() context become (if (peerClosed) closing else writing) } } else if (peerClosed) context stop self else context become writing } } //#buffering //#closing def closing: Receive = { case CommandFailed(_: Write) => connection ! ResumeWriting context.become({ case WritingResumed => writeAll() context.unbecome() case ack: Int => acknowledge(ack) }, discardOld = false) case Ack(ack) => acknowledge(ack) if (storage.isEmpty) context stop self } //#closing override def postStop(): Unit = { log.info(s"transferred $transferred bytes from/to [$remote]") } //#storage-omitted private var storageOffset = 0 private var storage = Vector.empty[ByteString] private var stored = 0L private var transferred = 0L val maxStored = 100000000L val highWatermark = maxStored * 5 / 10 val lowWatermark = maxStored * 3 / 10 private var suspended = false private def currentOffset = storageOffset + storage.size //#helpers private def buffer(data: ByteString): Unit = { storage :+= data stored += data.size if (stored > maxStored) { log.warning(s"drop connection to [$remote] (buffer overrun)") context stop self } else if (stored > highWatermark) { log.debug(s"suspending reading at $currentOffset") connection ! SuspendReading suspended = true } } private def acknowledge(ack: Int): Unit = { require(ack == storageOffset, s"received ack $ack at $storageOffset") require(storage.nonEmpty, s"storage was empty at ack $ack") val size = storage(0).size stored -= size transferred += size storageOffset += 1 storage = storage drop 1 if (suspended && stored < lowWatermark) { log.debug("resuming reading") connection ! ResumeReading suspended = false } } //#helpers private def writeFirst(): Unit = { connection ! Write(storage(0), Ack(storageOffset)) } private def writeAll(): Unit = { for ((data, i) <- storage.zipWithIndex) { connection ! Write(data, Ack(storageOffset + i)) } } //#storage-omitted } //#echo-handler //#simple-echo-handler class SimpleEchoHandler(connection: ActorRef, remote: InetSocketAddress) extends Actor with ActorLogging { import Tcp._ // sign death pact: this actor terminates when connection breaks context watch connection case object Ack extends Event def receive = { case Received(data) => buffer(data) connection ! Write(data, Ack) context.become({ case Received(data) => buffer(data) case Ack => acknowledge() case PeerClosed => closing = true }, discardOld = false) case PeerClosed => context stop self } //#storage-omitted override def postStop(): Unit = { log.info(s"transferred $transferred bytes from/to [$remote]") } var storage = Vector.empty[ByteString] var stored = 0L var transferred = 0L var closing = false val maxStored = 100000000L val highWatermark = maxStored * 5 / 10 val lowWatermark = maxStored * 3 / 10 var suspended = false //#simple-helpers private def buffer(data: ByteString): Unit = { storage :+= data stored += data.size if (stored > maxStored) { log.warning(s"drop connection to [$remote] (buffer overrun)") context stop self } else if (stored > highWatermark) { log.debug(s"suspending reading") connection ! SuspendReading suspended = true } } private def acknowledge(): Unit = { require(storage.nonEmpty, "storage was empty") val size = storage(0).size stored -= size transferred += size storage = storage drop 1 if (suspended && stored < lowWatermark) { log.debug("resuming reading") connection ! ResumeReading suspended = false } if (storage.isEmpty) { if (closing) context stop self else context.unbecome() } else connection ! Write(storage(0), Ack) } //#simple-helpers //#storage-omitted } //#simple-echo-handler Other Akka source code examplesHere is a short list of links related to this Akka EchoServer.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.