|
Akka/Scala example source code file (SerializerSpec.scala)
The SerializerSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.serialization
import scala.collection.immutable
import com.typesafe.config._
import akka.actor._
import akka.persistence._
import akka.serialization._
import akka.testkit._
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
object SerializerSpecConfigs {
val customSerializers = ConfigFactory.parseString(
"""
akka.actor {
serializers {
my-payload = "akka.persistence.serialization.MyPayloadSerializer"
my-snapshot = "akka.persistence.serialization.MySnapshotSerializer"
}
serialization-bindings {
"akka.persistence.serialization.MyPayload" = my-payload
"akka.persistence.serialization.MySnapshot" = my-snapshot
}
}
""")
val remote = ConfigFactory.parseString(
"""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
loglevel = ERROR
log-dead-letters = 0
log-dead-letters-during-shutdown = off
}
""")
def config(configs: String*): Config =
configs.foldLeft(ConfigFactory.empty)((r, c) ⇒ r.withFallback(ConfigFactory.parseString(c)))
}
import SerializerSpecConfigs._
class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
val serialization = SerializationExtension(system)
"A snapshot serializer" must {
"handle custom snapshot Serialization" in {
val wrapped = Snapshot(MySnapshot("a"))
val serializer = serialization.findSerializerFor(wrapped)
val bytes = serializer.toBinary(wrapped)
val deserialized = serializer.fromBinary(bytes, None)
deserialized should be(Snapshot(MySnapshot(".a.")))
}
}
}
class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
val serialization = SerializationExtension(system)
"A message serializer" when {
"not given a manifest" must {
"handle custom ConfirmablePersistent message serialization" in {
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, 3, List("c1", "c2"), confirmable = true, DeliveredByChannel("p2", "c2", 14), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, None)
deserialized should be(persistent.withPayload(MyPayload(".a.")))
}
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, 0, List("c1", "c2"), confirmable = false, DeliveredByChannel("p2", "c2", 14), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, None)
deserialized should be(persistent.withPayload(MyPayload(".a.")))
}
}
"given a PersistentRepr manifest" must {
"handle custom ConfirmablePersistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, 3, List("c1", "c2"), confirmable = true, DeliveredByChannel("p2", "c2", 14), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, Some(classOf[PersistentRepr]))
deserialized should be(persistent.withPayload(MyPayload(".b.")))
}
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, 3, List("c1", "c2"), confirmable = true, DeliveredByChannel("p2", "c2", 14), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, Some(classOf[PersistentRepr]))
deserialized should be(persistent.withPayload(MyPayload(".b.")))
}
}
"given a Confirm manifest" must {
"handle DeliveryByChannel message serialization" in {
val confirmation = DeliveredByChannel("p2", "c2", 14)
val serializer = serialization.findSerializerFor(confirmation)
val bytes = serializer.toBinary(confirmation)
val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByChannel]))
deserialized should be(confirmation)
}
"handle DeliveredByPersistentChannel message serialization" in {
val confirmation = DeliveredByPersistentChannel("c2", 14)
val serializer = serialization.findSerializerFor(confirmation)
val bytes = serializer.toBinary(confirmation)
val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistentChannel]))
deserialized should be(confirmation)
}
}
"given AtLeastOnceDeliverySnapshot" must {
"handle empty unconfirmed" in {
val unconfirmed = Vector.empty
val snap = AtLeastOnceDeliverySnapshot(13, unconfirmed)
val serializer = serialization.findSerializerFor(snap)
val bytes = serializer.toBinary(snap)
val deserialized = serializer.fromBinary(bytes, Some(classOf[AtLeastOnceDeliverySnapshot]))
deserialized should be(snap)
}
"handle a few unconfirmed" in {
val unconfirmed = Vector(
UnconfirmedDelivery(deliveryId = 1, destination = testActor.path, "a"),
UnconfirmedDelivery(deliveryId = 2, destination = testActor.path, "b"),
UnconfirmedDelivery(deliveryId = 3, destination = testActor.path, 42))
val snap = AtLeastOnceDeliverySnapshot(17, unconfirmed)
val serializer = serialization.findSerializerFor(snap)
val bytes = serializer.toBinary(snap)
val deserialized = serializer.fromBinary(bytes, Some(classOf[AtLeastOnceDeliverySnapshot]))
deserialized should be(snap)
}
}
}
}
object MessageSerializerRemotingSpec {
class LocalActor(port: Int) extends Actor {
def receive = {
case m ⇒ context.actorSelection(s"akka.tcp://remote@127.0.0.1:${port}/user/remote") tell (m, sender())
}
}
class RemoteActor extends Actor {
def receive = {
case PersistentBatch(Persistent(MyPayload(data), _) +: tail) ⇒ sender() ! s"b${data}"
case ConfirmablePersistent(MyPayload(data), _, _) ⇒ sender() ! s"c${data}"
case Persistent(MyPayload(data), _) ⇒ sender() ! s"p${data}"
case DeliveredByChannel(pid, cid, msnr, dsnr, ep) ⇒ sender() ! s"${pid},${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}"
case DeliveredByPersistentChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}"
case Deliver(Persistent(payload, _), dp) ⇒ context.actorSelection(dp) ! payload
}
}
def port(system: ActorSystem) =
address(system).port.get
def address(system: ActorSystem) =
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
}
class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customSerializers)) with ImplicitSender with DefaultTimeout {
import MessageSerializerRemotingSpec._
val remoteSystem = ActorSystem("remote", remote.withFallback(customSerializers))
val localActor = system.actorOf(Props(classOf[LocalActor], port(remoteSystem)), "local")
override protected def atStartup() {
remoteSystem.actorOf(Props[RemoteActor], "remote")
}
override def afterTermination() {
remoteSystem.shutdown()
remoteSystem.awaitTermination()
}
"A message serializer" must {
"custom-serialize Persistent messages during remoting" in {
localActor ! Persistent(MyPayload("a"))
expectMsg("p.a.")
}
"custom-serialize ConfirmablePersistent messages during remoting" in {
localActor ! PersistentRepr(MyPayload("a"), confirmable = true)
expectMsg("c.a.")
}
"custom-serialize Persistent message batches during remoting" in {
localActor ! PersistentBatch(immutable.Seq(Persistent(MyPayload("a"))))
expectMsg("b.a.")
}
"serialize DeliveredByChannel messages during remoting" in {
localActor ! DeliveredByChannel("a", "b", 2, 3, testActor)
expectMsg("a,b,2,3,true")
}
"serialize DeliveredByPersistentChannel messages during remoting" in {
localActor ! DeliveredByPersistentChannel("c", 2, 3, testActor)
expectMsg("c,2,3,true")
}
"serialize Deliver messages during remoting" in {
localActor ! Deliver(Persistent("a"), ActorPath.fromString(testActor.path.toStringWithAddress(address(system))))
expectMsg("a")
}
}
}
final case class MyPayload(data: String)
final case class MySnapshot(data: String)
class MyPayloadSerializer extends Serializer {
val MyPayloadClass = classOf[MyPayload]
def identifier: Int = 77123
def includeManifest: Boolean = true
def toBinary(o: AnyRef): Array[Byte] = o match {
case MyPayload(data) ⇒ s".${data}".getBytes("UTF-8")
}
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case Some(MyPayloadClass) ⇒ MyPayload(s"${new String(bytes, "UTF-8")}.")
case Some(c) ⇒ throw new Exception(s"unexpected manifest ${c}")
case None ⇒ throw new Exception("no manifest")
}
}
class MySnapshotSerializer extends Serializer {
val MySnapshotClass = classOf[MySnapshot]
def identifier: Int = 77124
def includeManifest: Boolean = true
def toBinary(o: AnyRef): Array[Byte] = o match {
case MySnapshot(data) ⇒ s".${data}".getBytes("UTF-8")
}
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case Some(MySnapshotClass) ⇒ MySnapshot(s"${new String(bytes, "UTF-8")}.")
case Some(c) ⇒ throw new Exception(s"unexpected manifest ${c}")
case None ⇒ throw new Exception("no manifest")
}
}
Other Akka source code examplesHere is a short list of links related to this Akka SerializerSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.