|
Akka/Scala example source code file (SnapshotSerializer.scala)
The SnapshotSerializer.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2012-2013 Eligotech BV. */ package akka.persistence.serialization import java.io._ import akka.actor._ import akka.serialization.{ Serializer, SerializationExtension } import akka.serialization.Serialization /** * Wrapper for snapshot `data`. Snapshot `data` are the actual snapshot objects captured by * a [[Processor]]. * * @see [[SnapshotSerializer]] */ @SerialVersionUID(1L) final case class Snapshot(data: Any) /** * INTERNAL API. */ @SerialVersionUID(1L) private[serialization] final case class SnapshotHeader(serializerId: Int, manifest: Option[String]) /** * [[Snapshot]] serializer. */ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { def identifier: Int = 8 def includeManifest: Boolean = false private lazy val transportInformation: Option[Serialization.Information] = { val address = system.provider.getDefaultAddress if (address.hasLocalScope) None else Some(Serialization.Information(address, system)) } /** * Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching * `akka.serialization.Serializer`. */ def toBinary(o: AnyRef): Array[Byte] = o match { case Snapshot(data) ⇒ snapshotToBinary(data.asInstanceOf[AnyRef]) case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } /** * Deserializes a [[Snapshot]]. Delegates deserialization of snapshot `data` to a matching * `akka.serialization.Serializer`. */ def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = Snapshot(snapshotFromBinary(bytes)) private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = { def serialize() = { val extension = SerializationExtension(system) val snapshotSerializer = extension.findSerializerFor(snapshot) val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest) val headerSerializer = extension.findSerializerFor(header) val headerBytes = headerSerializer.toBinary(header) val out = new ByteArrayOutputStream writeInt(out, headerBytes.length) out.write(headerBytes) out.write(snapshotSerializer.toBinary(snapshot)) out.toByteArray } // serialize actor references with full address information (defaultAddress) transportInformation match { case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { serialize() } case None ⇒ serialize() } } private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = { val extension = SerializationExtension(system) val headerLength = readInt(new ByteArrayInputStream(bytes)) val headerBytes = bytes.slice(4, headerLength + 4) val snapshotBytes = bytes.drop(headerLength + 4) val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]).get val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get) extension.deserialize[AnyRef](snapshotBytes, header.serializerId, manifest).get } private def writeInt(outputStream: OutputStream, i: Int) = 0 to 24 by 8 foreach { shift ⇒ outputStream.write(i >> shift) } private def readInt(inputStream: InputStream) = (0 to 24 by 8).foldLeft(0) { (id, shift) ⇒ (id | (inputStream.read() << shift)) } } Other Akka source code examplesHere is a short list of links related to this Akka SnapshotSerializer.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.