|
Akka/Scala example source code file (UdpDocSpec.scala)
The UdpDocSpec.scala Akka example source code
package docs.io
import akka.testkit.AkkaSpec
import akka.actor.Actor
import akka.io.IO
import akka.io.Udp
import akka.actor.ActorRef
import java.net.InetSocketAddress
import akka.util.ByteString
import akka.testkit.TestProbe
import akka.actor.Props
import scala.concurrent.duration._
import akka.actor.PoisonPill
import akka.io.UdpConnected
object ScalaUdpDocSpec {
//#sender
class SimpleSender(remote: InetSocketAddress) extends Actor {
import context.system
IO(Udp) ! Udp.SimpleSender
def receive = {
case Udp.SimpleSenderReady =>
context.become(ready(sender()))
//#sender
sender() ! Udp.Send(ByteString("hello"), remote)
//#sender
}
def ready(send: ActorRef): Receive = {
case msg: String =>
send ! Udp.Send(ByteString(msg), remote)
//#sender
if (msg == "world") send ! PoisonPill
//#sender
}
}
//#sender
//#listener
class Listener(nextActor: ActorRef) extends Actor {
import context.system
IO(Udp) ! Udp.Bind(self, new InetSocketAddress("localhost", 0))
def receive = {
case Udp.Bound(local) =>
//#listener
nextActor forward local
//#listener
context.become(ready(sender()))
}
def ready(socket: ActorRef): Receive = {
case Udp.Received(data, remote) =>
val processed = // parse data etc., e.g. using PipelineStage
//#listener
data.utf8String
//#listener
socket ! Udp.Send(data, remote) // example server echoes back
nextActor ! processed
case Udp.Unbind => socket ! Udp.Unbind
case Udp.Unbound => context.stop(self)
}
}
//#listener
//#connected
class Connected(remote: InetSocketAddress) extends Actor {
import context.system
IO(UdpConnected) ! UdpConnected.Connect(self, remote)
def receive = {
case UdpConnected.Connected =>
context.become(ready(sender()))
//#connected
sender() ! UdpConnected.Send(ByteString("hello"))
//#connected
}
def ready(connection: ActorRef): Receive = {
case UdpConnected.Received(data) =>
// process data, send it on, etc.
//#connected
if (data.utf8String == "hello")
connection ! UdpConnected.Send(ByteString("world"))
//#connected
case msg: String =>
connection ! UdpConnected.Send(ByteString(msg))
case d @ UdpConnected.Disconnect => connection ! d
case UdpConnected.Disconnected => context.stop(self)
}
}
//#connected
}
abstract class UdpDocSpec extends AkkaSpec {
def listenerProps(next: ActorRef): Props
def simpleSenderProps(remote: InetSocketAddress): Props
def connectedProps(remote: InetSocketAddress): Props
"demonstrate Udp" in {
val probe = TestProbe()
val listen = watch(system.actorOf(listenerProps(probe.ref)))
val local = probe.expectMsgType[InetSocketAddress]
val listener = probe.lastSender
val send = system.actorOf(simpleSenderProps(local))
probe.expectMsg("hello")
send ! "world"
probe.expectMsg("world")
listen ! Udp.Unbind
expectTerminated(listen)
}
"demonstrate Udp suspend reading" in {
val probe = TestProbe()
val listen = watch(system.actorOf(listenerProps(probe.ref)))
val local = probe.expectMsgType[InetSocketAddress]
val listener = probe.lastSender
listener ! Udp.SuspendReading
Thread.sleep(1000) // no way to find out when the above is finished
val send = system.actorOf(simpleSenderProps(local))
probe.expectNoMsg(500.millis)
listener ! Udp.ResumeReading
probe.expectMsg("hello")
send ! "world"
probe.expectMsg("world")
listen ! Udp.Unbind
expectTerminated(listen)
}
"demonstrate UdpConnected" in {
val probe = TestProbe()
val listen = watch(system.actorOf(listenerProps(probe.ref)))
val local = probe.expectMsgType[InetSocketAddress]
val listener = probe.lastSender
val conn = watch(system.actorOf(connectedProps(local)))
probe.expectMsg("hello")
probe.expectMsg("world")
conn ! "hello"
probe.expectMsg("hello")
probe.expectMsg("world")
listen ! Udp.Unbind
expectTerminated(listen)
conn ! UdpConnected.Disconnect
expectTerminated(conn)
}
}
class ScalaUdpDocSpec extends UdpDocSpec {
import ScalaUdpDocSpec._
override def listenerProps(next: ActorRef) = Props(new Listener(next))
override def simpleSenderProps(remote: InetSocketAddress) = Props(new SimpleSender(remote))
override def connectedProps(remote: InetSocketAddress) = Props(new Connected(remote))
}
class JavaUdpDocSpec extends UdpDocSpec {
import UdpDocTest._
override def listenerProps(next: ActorRef) = Props(new Listener(next))
override def simpleSenderProps(remote: InetSocketAddress) = Props(new SimpleSender(remote))
override def connectedProps(remote: InetSocketAddress) = Props(new Connected(remote))
}
Other Akka source code examplesHere is a short list of links related to this Akka UdpDocSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.