|
Akka/Scala example source code file (ReadBackPressure.scala)
The ReadBackPressure.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io
import akka.actor.{ ActorRef, ActorLogging, Props, Actor, ActorSystem }
import akka.io.Tcp._
import akka.io.{ Tcp, IO }
import java.net.InetSocketAddress
import akka.testkit.{ ImplicitSender, TestProbe, AkkaSpec }
import akka.util.ByteString
object PullReadingExample {
class Listener(monitor: ActorRef) extends Actor {
import context.system
override def preStart: Unit =
//#pull-mode-bind
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0), pullMode = true)
//#pull-mode-bind
def receive = {
//#pull-accepting
case Bound(localAddress) =>
// Accept connections one by one
sender() ! ResumeAccepting(batchSize = 1)
context.become(listening(sender()))
//#pull-accepting
monitor ! localAddress
}
//#pull-accepting-cont
def listening(listener: ActorRef): Receive = {
case Connected(remote, local) =>
val handler = context.actorOf(Props(classOf[PullEcho], sender()))
sender() ! Register(handler, keepOpenOnPeerClosed = true)
listener ! ResumeAccepting(batchSize = 1)
}
//#pull-accepting-cont
}
case object Ack extends Event
class PullEcho(connection: ActorRef) extends Actor {
//#pull-reading-echo
override def preStart: Unit = connection ! ResumeReading
def receive = {
case Received(data) => connection ! Write(data, Ack)
case Ack => connection ! ResumeReading
}
//#pull-reading-echo
}
}
class PullReadingSpec extends AkkaSpec with ImplicitSender {
"demonstrate pull reading" in {
val probe = TestProbe()
system.actorOf(Props(classOf[PullReadingExample.Listener], probe.ref), "server")
val listenAddress = probe.expectMsgType[InetSocketAddress]
//#pull-mode-connect
IO(Tcp) ! Connect(listenAddress, pullMode = true)
//#pull-mode-connect
expectMsgType[Connected]
val connection = lastSender
val client = TestProbe()
client.send(connection, Register(client.ref))
client.send(connection, Write(ByteString("hello")))
client.send(connection, ResumeReading)
client.expectMsg(Received(ByteString("hello")))
system.shutdown()
system.awaitTermination
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ReadBackPressure.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.