|
Akka/Scala example source code file (UdpDocSpec.scala)
The UdpDocSpec.scala Akka example source codepackage docs.io import akka.testkit.AkkaSpec import akka.actor.Actor import akka.io.IO import akka.io.Udp import akka.actor.ActorRef import java.net.InetSocketAddress import akka.util.ByteString import akka.testkit.TestProbe import akka.actor.Props import scala.concurrent.duration._ import akka.actor.PoisonPill import akka.io.UdpConnected object ScalaUdpDocSpec { //#sender class SimpleSender(remote: InetSocketAddress) extends Actor { import context.system IO(Udp) ! Udp.SimpleSender def receive = { case Udp.SimpleSenderReady => context.become(ready(sender())) //#sender sender() ! Udp.Send(ByteString("hello"), remote) //#sender } def ready(send: ActorRef): Receive = { case msg: String => send ! Udp.Send(ByteString(msg), remote) //#sender if (msg == "world") send ! PoisonPill //#sender } } //#sender //#listener class Listener(nextActor: ActorRef) extends Actor { import context.system IO(Udp) ! Udp.Bind(self, new InetSocketAddress("localhost", 0)) def receive = { case Udp.Bound(local) => //#listener nextActor forward local //#listener context.become(ready(sender())) } def ready(socket: ActorRef): Receive = { case Udp.Received(data, remote) => val processed = // parse data etc., e.g. using PipelineStage //#listener data.utf8String //#listener socket ! Udp.Send(data, remote) // example server echoes back nextActor ! processed case Udp.Unbind => socket ! Udp.Unbind case Udp.Unbound => context.stop(self) } } //#listener //#connected class Connected(remote: InetSocketAddress) extends Actor { import context.system IO(UdpConnected) ! UdpConnected.Connect(self, remote) def receive = { case UdpConnected.Connected => context.become(ready(sender())) //#connected sender() ! UdpConnected.Send(ByteString("hello")) //#connected } def ready(connection: ActorRef): Receive = { case UdpConnected.Received(data) => // process data, send it on, etc. //#connected if (data.utf8String == "hello") connection ! UdpConnected.Send(ByteString("world")) //#connected case msg: String => connection ! UdpConnected.Send(ByteString(msg)) case d @ UdpConnected.Disconnect => connection ! d case UdpConnected.Disconnected => context.stop(self) } } //#connected } abstract class UdpDocSpec extends AkkaSpec { def listenerProps(next: ActorRef): Props def simpleSenderProps(remote: InetSocketAddress): Props def connectedProps(remote: InetSocketAddress): Props "demonstrate Udp" in { val probe = TestProbe() val listen = watch(system.actorOf(listenerProps(probe.ref))) val local = probe.expectMsgType[InetSocketAddress] val listener = probe.lastSender val send = system.actorOf(simpleSenderProps(local)) probe.expectMsg("hello") send ! "world" probe.expectMsg("world") listen ! Udp.Unbind expectTerminated(listen) } "demonstrate Udp suspend reading" in { val probe = TestProbe() val listen = watch(system.actorOf(listenerProps(probe.ref))) val local = probe.expectMsgType[InetSocketAddress] val listener = probe.lastSender listener ! Udp.SuspendReading Thread.sleep(1000) // no way to find out when the above is finished val send = system.actorOf(simpleSenderProps(local)) probe.expectNoMsg(500.millis) listener ! Udp.ResumeReading probe.expectMsg("hello") send ! "world" probe.expectMsg("world") listen ! Udp.Unbind expectTerminated(listen) } "demonstrate UdpConnected" in { val probe = TestProbe() val listen = watch(system.actorOf(listenerProps(probe.ref))) val local = probe.expectMsgType[InetSocketAddress] val listener = probe.lastSender val conn = watch(system.actorOf(connectedProps(local))) probe.expectMsg("hello") probe.expectMsg("world") conn ! "hello" probe.expectMsg("hello") probe.expectMsg("world") listen ! Udp.Unbind expectTerminated(listen) conn ! UdpConnected.Disconnect expectTerminated(conn) } } class ScalaUdpDocSpec extends UdpDocSpec { import ScalaUdpDocSpec._ override def listenerProps(next: ActorRef) = Props(new Listener(next)) override def simpleSenderProps(remote: InetSocketAddress) = Props(new SimpleSender(remote)) override def connectedProps(remote: InetSocketAddress) = Props(new Connected(remote)) } class JavaUdpDocSpec extends UdpDocSpec { import UdpDocTest._ override def listenerProps(next: ActorRef) = Props(new Listener(next)) override def simpleSenderProps(remote: InetSocketAddress) = Props(new SimpleSender(remote)) override def connectedProps(remote: InetSocketAddress) = Props(new Connected(remote)) } Other Akka source code examplesHere is a short list of links related to this Akka UdpDocSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.