alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (AkkaPduCodec.scala)

This example Akka source code file (AkkaPduCodec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, address, akka, akkapdu, array, byte, bytestring, none, option, pducodecexception, remote, reuse, some, transport, util

The AkkaPduCodec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.remote.transport

import akka.AkkaException
import akka.actor.{ AddressFromURIString, InternalActorRef, Address, ActorRef }
import akka.remote.WireFormats._
import akka.remote._
import akka.util.ByteString
import com.google.protobuf.InvalidProtocolBufferException
import com.google.protobuf.{ ByteString ⇒ PByteString }

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[remote] class PduCodecException(msg: String, cause: Throwable) extends AkkaException(msg, cause)

/**
 * INTERNAL API
 *
 * Companion object of the [[akka.remote.transport.AkkaPduCodec]] trait. Contains the representation case classes
 * of decoded Akka Protocol Data Units (PDUs).
 */
private[remote] object AkkaPduCodec {

  /**
   * Trait that represents decoded Akka PDUs (Protocol Data Units)
   */
  sealed trait AkkaPdu
  final case class Associate(info: HandshakeInfo) extends AkkaPdu
  final case class Disassociate(reason: AssociationHandle.DisassociateInfo) extends AkkaPdu
  case object Heartbeat extends AkkaPdu
  final case class Payload(bytes: ByteString) extends AkkaPdu

  final case class Message(recipient: InternalActorRef,
                           recipientAddress: Address,
                           serializedMessage: SerializedMessage,
                           senderOption: Option[ActorRef],
                           seqOption: Option[SeqNo]) extends HasSequenceNumber {

    def reliableDeliveryEnabled = seqOption.isDefined

    override def seq: SeqNo = seqOption.get
  }
}

/**
 * INTERNAL API
 *
 * A Codec that is able to convert Akka PDUs (Protocol Data Units) from and to [[akka.util.ByteString]]s.
 */
private[remote] trait AkkaPduCodec {
  import AkkaPduCodec._
  /**
   * Returns an [[akka.remote.transport.AkkaPduCodec.AkkaPdu]] instance that represents the PDU contained in the raw
   * ByteString.
   * @param raw
   *   Encoded raw byte representation of an Akka PDU
   * @return
   *   Case class representation of the decoded PDU that can be used in a match statement
   */
  def decodePdu(raw: ByteString): AkkaPdu

  /**
   * Takes an [[akka.remote.transport.AkkaPduCodec.AkkaPdu]] representation of an Akka PDU and returns its encoded
   * form as a [[akka.util.ByteString]].
   *
   * For the same effect the constructXXX methods might be called directly, taking method parameters instead of the
   * [[akka.remote.transport.AkkaPduCodec.AkkaPdu]] final case classes.
   *
   * @param pdu
   *   The Akka Protocol Data Unit to be encoded
   * @return
   *   Encoded form as raw bytes
   */
  def encodePdu(pdu: AkkaPdu): ByteString = pdu match {
    case Associate(info)      ⇒ constructAssociate(info)
    case Payload(bytes)       ⇒ constructPayload(bytes)
    case Disassociate(reason) ⇒ constructDisassociate(reason)
    case Heartbeat            ⇒ constructHeartbeat
  }

  def constructPayload(payload: ByteString): ByteString

  def constructAssociate(info: HandshakeInfo): ByteString

  def constructDisassociate(reason: AssociationHandle.DisassociateInfo): ByteString

  def constructHeartbeat: ByteString

  def decodeMessage(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message])

  def constructMessage(
    localAddress: Address,
    recipient: ActorRef,
    serializedMessage: SerializedMessage,
    senderOption: Option[ActorRef],
    seqOption: Option[SeqNo] = None,
    ackOption: Option[Ack] = None): ByteString

  def constructPureAck(ack: Ack): ByteString
}

/**
 * INTERNAL API
 */
private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
  import AkkaPduCodec._

  private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = {
    val ackBuilder = AcknowledgementInfo.newBuilder()
    ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue)
    ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) }
    ackBuilder
  }

  override def constructMessage(
    localAddress: Address,
    recipient: ActorRef,
    serializedMessage: SerializedMessage,
    senderOption: Option[ActorRef],
    seqOption: Option[SeqNo] = None,
    ackOption: Option[Ack] = None): ByteString = {

    val ackAndEnvelopeBuilder = AckAndEnvelopeContainer.newBuilder

    val envelopeBuilder = RemoteEnvelope.newBuilder

    envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient))
    senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) }
    seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) }
    ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) }
    envelopeBuilder.setMessage(serializedMessage)
    ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder)

    ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!)
  }

  override def constructPureAck(ack: Ack): ByteString =
    ByteString.ByteString1C(AckAndEnvelopeContainer.newBuilder.setAck(ackBuilder(ack)).build().toByteArray) //Reuse Byte Array (naughty!)

  override def constructPayload(payload: ByteString): ByteString =
    ByteString.ByteString1C(AkkaProtocolMessage.newBuilder().setPayload(PByteString.copyFrom(payload.asByteBuffer)).build.toByteArray) //Reuse Byte Array (naughty!)

  override def constructAssociate(info: HandshakeInfo): ByteString = {
    val handshakeInfo = AkkaHandshakeInfo.newBuilder.setOrigin(serializeAddress(info.origin)).setUid(info.uid)
    info.cookie foreach handshakeInfo.setCookie
    constructControlMessagePdu(WireFormats.CommandType.ASSOCIATE, Some(handshakeInfo))
  }

  private val DISASSOCIATE = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE, None)
  private val DISASSOCIATE_SHUTTING_DOWN = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE_SHUTTING_DOWN, None)
  private val DISASSOCIATE_QUARANTINED = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE_QUARANTINED, None)

  override def constructDisassociate(info: AssociationHandle.DisassociateInfo): ByteString = info match {
    case AssociationHandle.Unknown     ⇒ DISASSOCIATE
    case AssociationHandle.Shutdown    ⇒ DISASSOCIATE_SHUTTING_DOWN
    case AssociationHandle.Quarantined ⇒ DISASSOCIATE_QUARANTINED
  }

  override val constructHeartbeat: ByteString =
    constructControlMessagePdu(WireFormats.CommandType.HEARTBEAT, None)

  override def decodePdu(raw: ByteString): AkkaPdu = {
    try {
      val pdu = AkkaProtocolMessage.parseFrom(raw.toArray)
      if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer()))
      else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction)
      else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null)
    } catch {
      case e: InvalidProtocolBufferException ⇒ throw new PduCodecException("Decoding PDU failed.", e)
    }
  }

  override def decodeMessage(
    raw: ByteString,
    provider: RemoteActorRefProvider,
    localAddress: Address): (Option[Ack], Option[Message]) = {
    val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArray)

    val ackOption = if (ackAndEnvelope.hasAck) {
      import scala.collection.JavaConverters._
      Some(Ack(SeqNo(ackAndEnvelope.getAck.getCumulativeAck), ackAndEnvelope.getAck.getNacksList.asScala.map(SeqNo(_)).toSet))
    } else None

    val messageOption = if (ackAndEnvelope.hasEnvelope) {
      val msgPdu = ackAndEnvelope.getEnvelope
      Some(Message(
        recipient = provider.resolveActorRefWithLocalAddress(msgPdu.getRecipient.getPath, localAddress),
        recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath),
        serializedMessage = msgPdu.getMessage,
        senderOption =
          if (msgPdu.hasSender) Some(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress))
          else None,
        seqOption =
          if (msgPdu.hasSeq) Some(SeqNo(msgPdu.getSeq)) else None))
    } else None

    (ackOption, messageOption)
  }

  private def decodeControlPdu(controlPdu: AkkaControlMessage): AkkaPdu = {

    controlPdu.getCommandType match {
      case CommandType.ASSOCIATE if controlPdu.hasHandshakeInfo ⇒
        val handshakeInfo = controlPdu.getHandshakeInfo
        val cookie = if (handshakeInfo.hasCookie) Some(handshakeInfo.getCookie) else None
        Associate(
          HandshakeInfo(
            decodeAddress(handshakeInfo.getOrigin),
            handshakeInfo.getUid.toInt, // 64 bits are allocated in the wire formats, but we use only 32 for now
            cookie))
      case CommandType.DISASSOCIATE               ⇒ Disassociate(AssociationHandle.Unknown)
      case CommandType.DISASSOCIATE_SHUTTING_DOWN ⇒ Disassociate(AssociationHandle.Shutdown)
      case CommandType.DISASSOCIATE_QUARANTINED   ⇒ Disassociate(AssociationHandle.Quarantined)
      case CommandType.HEARTBEAT                  ⇒ Heartbeat
      case x ⇒
        throw new PduCodecException(s"Decoding of control PDU failed, invalid format, unexpected: [${x}]", null)
    }
  }

  private def decodeAddress(encodedAddress: AddressData): Address =
    Address(encodedAddress.getProtocol, encodedAddress.getSystem, encodedAddress.getHostname, encodedAddress.getPort)

  private def constructControlMessagePdu(
    code: WireFormats.CommandType,
    handshakeInfo: Option[AkkaHandshakeInfo.Builder]): ByteString = {

    val controlMessageBuilder = AkkaControlMessage.newBuilder()
    controlMessageBuilder.setCommandType(code)
    handshakeInfo foreach controlMessageBuilder.setHandshakeInfo

    ByteString.ByteString1C(AkkaProtocolMessage.newBuilder().setInstruction(controlMessageBuilder.build).build.toByteArray) //Reuse Byte Array (naughty!)
  }

  private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = {
    ActorRefData.newBuilder.setPath(
      if (ref.path.address.host.isDefined) ref.path.toSerializationFormat else ref.path.toSerializationFormatWithAddress(defaultAddress)).build()
  }

  private def serializeAddress(address: Address): AddressData = address match {
    case Address(protocol, system, Some(host), Some(port)) ⇒
      AddressData.newBuilder
        .setHostname(host)
        .setPort(port)
        .setSystem(system)
        .setProtocol(protocol)
        .build()
    case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka AkkaPduCodec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.