alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (SerializerSpec.scala)

This example Akka source code file (SerializerSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, array, deliveredbychannel, deliveredbypersistentchannel, list, none, persistence, persistent, persistentrepr, serialization, serialize, some, string, test, testing, utf-8

The SerializerSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.persistence.serialization

import scala.collection.immutable
import com.typesafe.config._
import akka.actor._
import akka.persistence._
import akka.serialization._
import akka.testkit._

import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery

object SerializerSpecConfigs {
  val customSerializers = ConfigFactory.parseString(
    """
      akka.actor {
        serializers {
          my-payload = "akka.persistence.serialization.MyPayloadSerializer"
          my-snapshot = "akka.persistence.serialization.MySnapshotSerializer"
        }
        serialization-bindings {
          "akka.persistence.serialization.MyPayload" = my-payload
          "akka.persistence.serialization.MySnapshot" = my-snapshot
        }
      }
    """)

  val remote = ConfigFactory.parseString(
    """
      akka {
        actor {
          provider = "akka.remote.RemoteActorRefProvider"
        }
        remote {
          enabled-transports = ["akka.remote.netty.tcp"]
          netty.tcp {
            hostname = "127.0.0.1"
            port = 0
          }
        }
        loglevel = ERROR
        log-dead-letters = 0
        log-dead-letters-during-shutdown = off
      }
    """)

  def config(configs: String*): Config =
    configs.foldLeft(ConfigFactory.empty)((r, c) ⇒ r.withFallback(ConfigFactory.parseString(c)))
}

import SerializerSpecConfigs._

class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
  val serialization = SerializationExtension(system)

  "A snapshot serializer" must {
    "handle custom snapshot Serialization" in {
      val wrapped = Snapshot(MySnapshot("a"))
      val serializer = serialization.findSerializerFor(wrapped)

      val bytes = serializer.toBinary(wrapped)
      val deserialized = serializer.fromBinary(bytes, None)

      deserialized should be(Snapshot(MySnapshot(".a.")))
    }
  }
}

class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
  val serialization = SerializationExtension(system)

  "A message serializer" when {
    "not given a manifest" must {
      "handle custom ConfirmablePersistent message serialization" in {
        val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, 3, List("c1", "c2"), confirmable = true, DeliveredByChannel("p2", "c2", 14), testActor, testActor)
        val serializer = serialization.findSerializerFor(persistent)

        val bytes = serializer.toBinary(persistent)
        val deserialized = serializer.fromBinary(bytes, None)

        deserialized should be(persistent.withPayload(MyPayload(".a.")))
      }
      "handle custom Persistent message serialization" in {
        val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, 0, List("c1", "c2"), confirmable = false, DeliveredByChannel("p2", "c2", 14), testActor, testActor)
        val serializer = serialization.findSerializerFor(persistent)

        val bytes = serializer.toBinary(persistent)
        val deserialized = serializer.fromBinary(bytes, None)

        deserialized should be(persistent.withPayload(MyPayload(".a.")))
      }
    }
    "given a PersistentRepr manifest" must {
      "handle custom ConfirmablePersistent message serialization" in {
        val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, 3, List("c1", "c2"), confirmable = true, DeliveredByChannel("p2", "c2", 14), testActor, testActor)
        val serializer = serialization.findSerializerFor(persistent)

        val bytes = serializer.toBinary(persistent)
        val deserialized = serializer.fromBinary(bytes, Some(classOf[PersistentRepr]))

        deserialized should be(persistent.withPayload(MyPayload(".b.")))
      }
      "handle custom Persistent message serialization" in {
        val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, 3, List("c1", "c2"), confirmable = true, DeliveredByChannel("p2", "c2", 14), testActor, testActor)
        val serializer = serialization.findSerializerFor(persistent)

        val bytes = serializer.toBinary(persistent)
        val deserialized = serializer.fromBinary(bytes, Some(classOf[PersistentRepr]))

        deserialized should be(persistent.withPayload(MyPayload(".b.")))
      }
    }
    "given a Confirm manifest" must {
      "handle DeliveryByChannel message serialization" in {
        val confirmation = DeliveredByChannel("p2", "c2", 14)
        val serializer = serialization.findSerializerFor(confirmation)

        val bytes = serializer.toBinary(confirmation)
        val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByChannel]))

        deserialized should be(confirmation)
      }
      "handle DeliveredByPersistentChannel message serialization" in {
        val confirmation = DeliveredByPersistentChannel("c2", 14)
        val serializer = serialization.findSerializerFor(confirmation)

        val bytes = serializer.toBinary(confirmation)
        val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistentChannel]))

        deserialized should be(confirmation)
      }
    }

    "given AtLeastOnceDeliverySnapshot" must {
      "handle empty unconfirmed" in {
        val unconfirmed = Vector.empty
        val snap = AtLeastOnceDeliverySnapshot(13, unconfirmed)
        val serializer = serialization.findSerializerFor(snap)

        val bytes = serializer.toBinary(snap)
        val deserialized = serializer.fromBinary(bytes, Some(classOf[AtLeastOnceDeliverySnapshot]))

        deserialized should be(snap)
      }

      "handle a few unconfirmed" in {
        val unconfirmed = Vector(
          UnconfirmedDelivery(deliveryId = 1, destination = testActor.path, "a"),
          UnconfirmedDelivery(deliveryId = 2, destination = testActor.path, "b"),
          UnconfirmedDelivery(deliveryId = 3, destination = testActor.path, 42))
        val snap = AtLeastOnceDeliverySnapshot(17, unconfirmed)
        val serializer = serialization.findSerializerFor(snap)

        val bytes = serializer.toBinary(snap)
        val deserialized = serializer.fromBinary(bytes, Some(classOf[AtLeastOnceDeliverySnapshot]))

        deserialized should be(snap)
      }

    }

  }
}

object MessageSerializerRemotingSpec {
  class LocalActor(port: Int) extends Actor {
    def receive = {
      case m ⇒ context.actorSelection(s"akka.tcp://remote@127.0.0.1:${port}/user/remote") tell (m, sender())
    }
  }

  class RemoteActor extends Actor {
    def receive = {
      case PersistentBatch(Persistent(MyPayload(data), _) +: tail) ⇒ sender() ! s"b${data}"
      case ConfirmablePersistent(MyPayload(data), _, _)            ⇒ sender() ! s"c${data}"
      case Persistent(MyPayload(data), _)                          ⇒ sender() ! s"p${data}"
      case DeliveredByChannel(pid, cid, msnr, dsnr, ep)            ⇒ sender() ! s"${pid},${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}"
      case DeliveredByPersistentChannel(cid, msnr, dsnr, ep)       ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}"
      case Deliver(Persistent(payload, _), dp)                     ⇒ context.actorSelection(dp) ! payload
    }
  }

  def port(system: ActorSystem) =
    address(system).port.get

  def address(system: ActorSystem) =
    system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
}

class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customSerializers)) with ImplicitSender with DefaultTimeout {
  import MessageSerializerRemotingSpec._

  val remoteSystem = ActorSystem("remote", remote.withFallback(customSerializers))
  val localActor = system.actorOf(Props(classOf[LocalActor], port(remoteSystem)), "local")

  override protected def atStartup() {
    remoteSystem.actorOf(Props[RemoteActor], "remote")
  }

  override def afterTermination() {
    remoteSystem.shutdown()
    remoteSystem.awaitTermination()
  }

  "A message serializer" must {
    "custom-serialize Persistent messages during remoting" in {
      localActor ! Persistent(MyPayload("a"))
      expectMsg("p.a.")
    }
    "custom-serialize ConfirmablePersistent messages during remoting" in {
      localActor ! PersistentRepr(MyPayload("a"), confirmable = true)
      expectMsg("c.a.")
    }
    "custom-serialize Persistent message batches during remoting" in {
      localActor ! PersistentBatch(immutable.Seq(Persistent(MyPayload("a"))))
      expectMsg("b.a.")
    }
    "serialize DeliveredByChannel messages during remoting" in {
      localActor ! DeliveredByChannel("a", "b", 2, 3, testActor)
      expectMsg("a,b,2,3,true")
    }
    "serialize DeliveredByPersistentChannel messages during remoting" in {
      localActor ! DeliveredByPersistentChannel("c", 2, 3, testActor)
      expectMsg("c,2,3,true")
    }
    "serialize Deliver messages during remoting" in {
      localActor ! Deliver(Persistent("a"), ActorPath.fromString(testActor.path.toStringWithAddress(address(system))))
      expectMsg("a")
    }
  }
}

final case class MyPayload(data: String)
final case class MySnapshot(data: String)

class MyPayloadSerializer extends Serializer {
  val MyPayloadClass = classOf[MyPayload]

  def identifier: Int = 77123
  def includeManifest: Boolean = true

  def toBinary(o: AnyRef): Array[Byte] = o match {
    case MyPayload(data) ⇒ s".${data}".getBytes("UTF-8")
  }

  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
    case Some(MyPayloadClass) ⇒ MyPayload(s"${new String(bytes, "UTF-8")}.")
    case Some(c)              ⇒ throw new Exception(s"unexpected manifest ${c}")
    case None                 ⇒ throw new Exception("no manifest")
  }
}

class MySnapshotSerializer extends Serializer {
  val MySnapshotClass = classOf[MySnapshot]

  def identifier: Int = 77124
  def includeManifest: Boolean = true

  def toBinary(o: AnyRef): Array[Byte] = o match {
    case MySnapshot(data) ⇒ s".${data}".getBytes("UTF-8")
  }

  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
    case Some(MySnapshotClass) ⇒ MySnapshot(s"${new String(bytes, "UTF-8")}.")
    case Some(c)               ⇒ throw new Exception(s"unexpected manifest ${c}")
    case None                  ⇒ throw new Exception("no manifest")
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka SerializerSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.