|
Akka/Scala example source code file (UdpConnected.scala)
The UdpConnected.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.io import java.lang.{ Iterable ⇒ JIterable } import java.net.InetSocketAddress import scala.collection.immutable import akka.io.Inet.SocketOption import akka.io.Udp.UdpSettings import akka.util.ByteString import akka.actor._ /** * UDP Extension for Akka’s IO layer. * * This extension implements the connectionless UDP protocol with * calling `connect` on the underlying sockets, i.e. with restricting * from whom data can be received. For “unconnected” UDP mode see [[Udp]]. * * For a full description of the design and philosophy behind this IO * implementation please refer to <a href="http://doc.akka.io/">the Akka online documentation</a>. * * The Java API for generating UDP commands is available at [[UdpConnectedMessage]]. */ object UdpConnected extends ExtensionId[UdpConnectedExt] with ExtensionIdProvider { override def lookup = UdpConnected override def createExtension(system: ExtendedActorSystem): UdpConnectedExt = new UdpConnectedExt(system) /** * Java API: retrieve the UdpConnected extension for the given system. */ override def get(system: ActorSystem): UdpConnectedExt = super.get(system) /** * The common interface for [[Command]] and [[Event]]. */ sealed trait Message /** * The common type of all commands supported by the UDP implementation. */ trait Command extends SelectionHandler.HasFailureMessage with Message { def failureMessage = CommandFailed(this) } /** * Each [[Send]] can optionally request a positive acknowledgment to be sent * to the commanding actor. If such notification is not desired the [[Send#ack]] * must be set to an instance of this class. The token contained within can be used * to recognize which write failed when receiving a [[CommandFailed]] message. */ case class NoAck(token: Any) extends Event /** * Default [[NoAck]] instance which is used when no acknowledgment information is * explicitly provided. Its “token” is `null`. */ object NoAck extends NoAck(null) /** * This message is understood by the connection actors to send data to their * designated destination. The connection actor will respond with * [[CommandFailed]] if the send could not be enqueued to the O/S kernel * because the send buffer was full. If the given `ack` is not of type [[NoAck]] * the connection actor will reply with the given object as soon as the datagram * has been successfully enqueued to the O/S kernel. */ final case class Send(payload: ByteString, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Send { def apply(data: ByteString): Send = Send(data, NoAck) } /** * Send this message to the [[UdpExt#manager]] in order to bind to a local * port (optionally with the chosen `localAddress`) and create a UDP socket * which is restricted to sending to and receiving from the given `remoteAddress`. * All received datagrams will be sent to the designated `handler` actor. */ final case class Connect(handler: ActorRef, remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil) extends Command /** * Send this message to a connection actor (which had previously sent the * [[Connected]] message) in order to close the socket. The connection actor * will reply with a [[Disconnected]] message. */ case object Disconnect extends Command /** * Send this message to a listener actor (which sent a [[Udp.Bound]] message) to * have it stop reading datagrams from the network. If the O/S kernel’s receive * buffer runs full then subsequent datagrams will be silently discarded. * Re-enable reading from the socket using the [[ResumeReading]] command. */ case object SuspendReading extends Command /** * This message must be sent to the listener actor to re-enable reading from * the socket after a [[SuspendReading]] command. */ case object ResumeReading extends Command /** * The common type of all events emitted by the UDP implementation. */ trait Event extends Message /** * When a connection actor receives a datagram from its socket it will send * it to the handler designated in the [[Udp.Bind]] message using this message type. */ final case class Received(data: ByteString) extends Event /** * When a command fails it will be replied to with this message type, * wrapping the failing command object. */ final case class CommandFailed(cmd: Command) extends Event /** * This message is sent by the connection actor to the actor which sent the * [[Connect]] message when the UDP socket has been bound to the local and * remote addresses given. */ sealed trait Connected extends Event case object Connected extends Connected /** * This message is sent by the connection actor to the actor which sent the * [[Disconnect]] message when the UDP socket has been closed. */ sealed trait Disconnected extends Event case object Disconnected extends Disconnected } class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension { val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-connected")) val manager: ActorRef = { system.systemActorOf( props = Props(classOf[UdpConnectedManager], this).withDeploy(Deploy.local), name = "IO-UDP-CONN") } /** * Java API: retrieve the UDP manager actor’s reference. */ def getManager: ActorRef = manager val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) } /** * Java API: factory methods for the message types used when communicating with the UdpConnected service. */ object UdpConnectedMessage { import language.implicitConversions import UdpConnected._ /** * Send this message to the [[UdpExt#manager]] in order to bind to a local * port (optionally with the chosen `localAddress`) and create a UDP socket * which is restricted to sending to and receiving from the given `remoteAddress`. * All received datagrams will be sent to the designated `handler` actor. */ def connect(handler: ActorRef, remoteAddress: InetSocketAddress, localAddress: InetSocketAddress, options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, Some(localAddress), options) /** * Connect without specifying the `localAddress`. */ def connect(handler: ActorRef, remoteAddress: InetSocketAddress, options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, None, options) /** * Connect without specifying the `localAddress` or `options`. */ def connect(handler: ActorRef, remoteAddress: InetSocketAddress): Command = Connect(handler, remoteAddress, None, Nil) /** * This message is understood by the connection actors to send data to their * designated destination. The connection actor will respond with * [[UdpConnected.CommandFailed]] if the send could not be enqueued to the O/S kernel * because the send buffer was full. If the given `ack` is not of type [[UdpConnected.NoAck]] * the connection actor will reply with the given object as soon as the datagram * has been successfully enqueued to the O/S kernel. */ def send(data: ByteString, ack: AnyRef): Command = Send(data, ack) /** * Send without requesting acknowledgment. */ def send(data: ByteString): Command = Send(data) /** * Send this message to a connection actor (which had previously sent the * [[UdpConnected.Connected]] message) in order to close the socket. The connection actor * will reply with a [[UdpConnected.Disconnected]] message. */ def disconnect: Command = Disconnect /** * Each [[UdpConnected.Send]] can optionally request a positive acknowledgment to be sent * to the commanding actor. If such notification is not desired the [[UdpConnected.Send#ack]] * must be set to an instance of this class. The token contained within can be used * to recognize which write failed when receiving a [[UdpConnected.CommandFailed]] message. */ def noAck(token: AnyRef): NoAck = NoAck(token) /** * Default [[UdpConnected.NoAck]] instance which is used when no acknowledgment information is * explicitly provided. Its “token” is `null`. */ def noAck: NoAck = NoAck /** * Send this message to a listener actor (which sent a [[Udp.Bound]] message) to * have it stop reading datagrams from the network. If the O/S kernel’s receive * buffer runs full then subsequent datagrams will be silently discarded. * Re-enable reading from the socket using the [[UdpConnected.ResumeReading]] command. */ def suspendReading: Command = SuspendReading /** * This message must be sent to the listener actor to re-enable reading from * the socket after a [[UdpConnected.SuspendReading]] command. */ def resumeReading: Command = ResumeReading implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { import scala.collection.JavaConverters._ coll.asScala.to[immutable.Traversable] } } Other Akka source code examplesHere is a short list of links related to this Akka UdpConnected.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.