| career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (AkkaPduCodec.scala)

This example Akka source code file (AkkaPduCodec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, address, akka, akkapdu, array, byte, bytestring, none, option, pducodecexception, remote, reuse, some, transport, util

The AkkaPduCodec.scala Akka example source code

 * Copyright (C) 2009-2014 Typesafe Inc. <>
package akka.remote.transport

import akka.AkkaException
import{ AddressFromURIString, InternalActorRef, Address, ActorRef }
import akka.remote.WireFormats._
import akka.remote._
import akka.util.ByteString
import{ ByteString ⇒ PByteString }

private[remote] class PduCodecException(msg: String, cause: Throwable) extends AkkaException(msg, cause)

 * Companion object of the [[akka.remote.transport.AkkaPduCodec]] trait. Contains the representation case classes
 * of decoded Akka Protocol Data Units (PDUs).
private[remote] object AkkaPduCodec {

   * Trait that represents decoded Akka PDUs (Protocol Data Units)
  sealed trait AkkaPdu
  final case class Associate(info: HandshakeInfo) extends AkkaPdu
  final case class Disassociate(reason: AssociationHandle.DisassociateInfo) extends AkkaPdu
  case object Heartbeat extends AkkaPdu
  final case class Payload(bytes: ByteString) extends AkkaPdu

  final case class Message(recipient: InternalActorRef,
                           recipientAddress: Address,
                           serializedMessage: SerializedMessage,
                           senderOption: Option[ActorRef],
                           seqOption: Option[SeqNo]) extends HasSequenceNumber {

    def reliableDeliveryEnabled = seqOption.isDefined

    override def seq: SeqNo = seqOption.get

 * A Codec that is able to convert Akka PDUs (Protocol Data Units) from and to [[akka.util.ByteString]]s.
private[remote] trait AkkaPduCodec {
  import AkkaPduCodec._
   * Returns an [[akka.remote.transport.AkkaPduCodec.AkkaPdu]] instance that represents the PDU contained in the raw
   * ByteString.
   * @param raw
   *   Encoded raw byte representation of an Akka PDU
   * @return
   *   Case class representation of the decoded PDU that can be used in a match statement
  def decodePdu(raw: ByteString): AkkaPdu

   * Takes an [[akka.remote.transport.AkkaPduCodec.AkkaPdu]] representation of an Akka PDU and returns its encoded
   * form as a [[akka.util.ByteString]].
   * For the same effect the constructXXX methods might be called directly, taking method parameters instead of the
   * [[akka.remote.transport.AkkaPduCodec.AkkaPdu]] final case classes.
   * @param pdu
   *   The Akka Protocol Data Unit to be encoded
   * @return
   *   Encoded form as raw bytes
  def encodePdu(pdu: AkkaPdu): ByteString = pdu match {
    case Associate(info)      ⇒ constructAssociate(info)
    case Payload(bytes)       ⇒ constructPayload(bytes)
    case Disassociate(reason) ⇒ constructDisassociate(reason)
    case Heartbeat            ⇒ constructHeartbeat

  def constructPayload(payload: ByteString): ByteString

  def constructAssociate(info: HandshakeInfo): ByteString

  def constructDisassociate(reason: AssociationHandle.DisassociateInfo): ByteString

  def constructHeartbeat: ByteString

  def decodeMessage(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message])

  def constructMessage(
    localAddress: Address,
    recipient: ActorRef,
    serializedMessage: SerializedMessage,
    senderOption: Option[ActorRef],
    seqOption: Option[SeqNo] = None,
    ackOption: Option[Ack] = None): ByteString

  def constructPureAck(ack: Ack): ByteString

private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
  import AkkaPduCodec._

  private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = {
    val ackBuilder = AcknowledgementInfo.newBuilder()
    ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) }

  override def constructMessage(
    localAddress: Address,
    recipient: ActorRef,
    serializedMessage: SerializedMessage,
    senderOption: Option[ActorRef],
    seqOption: Option[SeqNo] = None,
    ackOption: Option[Ack] = None): ByteString = {

    val ackAndEnvelopeBuilder = AckAndEnvelopeContainer.newBuilder

    val envelopeBuilder = RemoteEnvelope.newBuilder

    envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient))
    senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) }
    seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) }
    ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) }

    ByteString.ByteString1C( //Reuse Byte Array (naughty!)

  override def constructPureAck(ack: Ack): ByteString =
    ByteString.ByteString1C(AckAndEnvelopeContainer.newBuilder.setAck(ackBuilder(ack)).build().toByteArray) //Reuse Byte Array (naughty!)

  override def constructPayload(payload: ByteString): ByteString =
    ByteString.ByteString1C(AkkaProtocolMessage.newBuilder().setPayload(PByteString.copyFrom(payload.asByteBuffer)).build.toByteArray) //Reuse Byte Array (naughty!)

  override def constructAssociate(info: HandshakeInfo): ByteString = {
    val handshakeInfo = AkkaHandshakeInfo.newBuilder.setOrigin(serializeAddress(info.origin)).setUid(info.uid)
    info.cookie foreach handshakeInfo.setCookie
    constructControlMessagePdu(WireFormats.CommandType.ASSOCIATE, Some(handshakeInfo))

  private val DISASSOCIATE = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE, None)
  private val DISASSOCIATE_SHUTTING_DOWN = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE_SHUTTING_DOWN, None)
  private val DISASSOCIATE_QUARANTINED = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE_QUARANTINED, None)

  override def constructDisassociate(info: AssociationHandle.DisassociateInfo): ByteString = info match {
    case AssociationHandle.Unknown     ⇒ DISASSOCIATE
    case AssociationHandle.Shutdown    ⇒ DISASSOCIATE_SHUTTING_DOWN
    case AssociationHandle.Quarantined ⇒ DISASSOCIATE_QUARANTINED

  override val constructHeartbeat: ByteString =
    constructControlMessagePdu(WireFormats.CommandType.HEARTBEAT, None)

  override def decodePdu(raw: ByteString): AkkaPdu = {
    try {
      val pdu = AkkaProtocolMessage.parseFrom(raw.toArray)
      if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer()))
      else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction)
      else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null)
    } catch {
      case e: InvalidProtocolBufferException ⇒ throw new PduCodecException("Decoding PDU failed.", e)

  override def decodeMessage(
    raw: ByteString,
    provider: RemoteActorRefProvider,
    localAddress: Address): (Option[Ack], Option[Message]) = {
    val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArray)

    val ackOption = if (ackAndEnvelope.hasAck) {
      import scala.collection.JavaConverters._
    } else None

    val messageOption = if (ackAndEnvelope.hasEnvelope) {
      val msgPdu = ackAndEnvelope.getEnvelope
        recipient = provider.resolveActorRefWithLocalAddress(msgPdu.getRecipient.getPath, localAddress),
        recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath),
        serializedMessage = msgPdu.getMessage,
        senderOption =
          if (msgPdu.hasSender) Some(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress))
          else None,
        seqOption =
          if (msgPdu.hasSeq) Some(SeqNo(msgPdu.getSeq)) else None))
    } else None

    (ackOption, messageOption)

  private def decodeControlPdu(controlPdu: AkkaControlMessage): AkkaPdu = {

    controlPdu.getCommandType match {
      case CommandType.ASSOCIATE if controlPdu.hasHandshakeInfo ⇒
        val handshakeInfo = controlPdu.getHandshakeInfo
        val cookie = if (handshakeInfo.hasCookie) Some(handshakeInfo.getCookie) else None
            handshakeInfo.getUid.toInt, // 64 bits are allocated in the wire formats, but we use only 32 for now
      case CommandType.DISASSOCIATE               ⇒ Disassociate(AssociationHandle.Unknown)
      case CommandType.DISASSOCIATE_SHUTTING_DOWN ⇒ Disassociate(AssociationHandle.Shutdown)
      case CommandType.DISASSOCIATE_QUARANTINED   ⇒ Disassociate(AssociationHandle.Quarantined)
      case CommandType.HEARTBEAT                  ⇒ Heartbeat
      case x ⇒
        throw new PduCodecException(s"Decoding of control PDU failed, invalid format, unexpected: [${x}]", null)

  private def decodeAddress(encodedAddress: AddressData): Address =
    Address(encodedAddress.getProtocol, encodedAddress.getSystem, encodedAddress.getHostname, encodedAddress.getPort)

  private def constructControlMessagePdu(
    code: WireFormats.CommandType,
    handshakeInfo: Option[AkkaHandshakeInfo.Builder]): ByteString = {

    val controlMessageBuilder = AkkaControlMessage.newBuilder()
    handshakeInfo foreach controlMessageBuilder.setHandshakeInfo

    ByteString.ByteString1C(AkkaProtocolMessage.newBuilder().setInstruction( //Reuse Byte Array (naughty!)

  private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = {
      if ( ref.path.toSerializationFormat else ref.path.toSerializationFormatWithAddress(defaultAddress)).build()

  private def serializeAddress(address: Address): AddressData = address match {
    case Address(protocol, system, Some(host), Some(port)) ⇒
    case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")


Other Akka source code examples

Here is a short list of links related to this Akka AkkaPduCodec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller


new blog posts


Copyright 1998-2021 Alvin Alexander,
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.